void sync query return val

This commit is contained in:
Minglei Jin 2024-09-25 18:25:40 +08:00
parent e19514e2c4
commit 49a6bf4c23
10 changed files with 194 additions and 187 deletions

View File

@ -32,7 +32,6 @@ extern int32_t tsS3PageCacheSize;
extern int32_t tsS3UploadDelaySec; extern int32_t tsS3UploadDelaySec;
int32_t s3Init(); int32_t s3Init();
void s3CleanUp();
int32_t s3CheckCfg(); int32_t s3CheckCfg();
int32_t s3PutObjectFromFile(const char *file, const char *object); int32_t s3PutObjectFromFile(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp); int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp);

View File

@ -26,10 +26,10 @@ extern "C" {
#include "query.h" #include "query.h"
#include "taos.h" #include "taos.h"
#include "tcommon.h" #include "tcommon.h"
#include "tmisce.h"
#include "tdef.h" #include "tdef.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "tmisce.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "trpc.h" #include "trpc.h"
@ -86,7 +86,7 @@ typedef struct {
int8_t threadStop; int8_t threadStop;
int8_t quitByKill; int8_t quitByKill;
TdThread thread; TdThread thread;
TdThreadMutex lock; // used when app init and cleanup TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary; SHashObj* appSummary;
SHashObj* appHbHash; // key: clusterId SHashObj* appHbHash; // key: clusterId
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
@ -95,11 +95,11 @@ typedef struct {
} SClientHbMgr; } SClientHbMgr;
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us int64_t start; // start timestamp, us
int64_t ctgStart; // start to parse, us int64_t ctgStart; // start to parse, us
int64_t execStart; // start to parse, us int64_t execStart; // start to parse, us
int64_t parseCostUs; int64_t parseCostUs;
int64_t ctgCostUs; int64_t ctgCostUs;
int64_t analyseCostUs; int64_t analyseCostUs;
int64_t planCostUs; int64_t planCostUs;
@ -193,7 +193,7 @@ typedef struct SReqResultInfo {
char** convertBuf; char** convertBuf;
TAOS_ROW row; TAOS_ROW row;
SResultColumn* pCol; SResultColumn* pCol;
uint64_t numOfRows; // from int32_t change to int64_t uint64_t numOfRows; // from int32_t change to int64_t
uint64_t totalRows; uint64_t totalRows;
uint64_t current; uint64_t current;
bool localResultFetched; bool localResultFetched;
@ -319,12 +319,14 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source); TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source);
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid); TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source); void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int8_t source);
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid); int64_t reqid);
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param); void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param);
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, SParseSqlRes* pRes); int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
void syncQueryFn(void* param, void* res, int32_t code); SParseSqlRes* pRes);
void syncQueryFn(void* param, void* res, int32_t code);
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
@ -333,7 +335,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
return (SReqResultInfo*)&msg->common.resInfo; return (SReqResultInfo*)&msg->common.resInfo;
} }
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo); int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo);
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo); if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
return tmqGetCurResInfo(res); return tmqGetCurResInfo(res);
@ -349,8 +351,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo, int32_t createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo,
STscObj **p); STscObj** p);
void destroyTscObj(void* pObj); void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid); STscObj* acquireTscObj(int64_t rid);
void releaseTscObj(int64_t rid); void releaseTscObj(int64_t rid);
@ -358,7 +360,7 @@ void destroyAppInst(void* pAppInfo);
uint64_t generateRequestId(); uint64_t generateRequestId();
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest); int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj** pRequest);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid); SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid); int32_t releaseRequest(int64_t rid);
@ -372,9 +374,9 @@ void resetConnectDB(STscObj* pTscObj);
int taos_options_imp(TSDB_OPTION option, const char* str); int taos_options_imp(TSDB_OPTION option, const char* str);
int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void **pDnodeConn); int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void** pDnodeConn);
void tscStopCrashReport(); void tscStopCrashReport();
void cleanupAppInfo(); void cleanupAppInfo();
typedef struct AsyncArg { typedef struct AsyncArg {
SRpcMsg msg; SRpcMsg msg;
@ -402,17 +404,17 @@ int32_t hbMgrInit();
void hbMgrCleanUp(); void hbMgrCleanUp();
// cluster level // cluster level
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr); int32_t appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key, SAppHbMgr** pAppHbMgr);
void appHbMgrCleanup(void); void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests); void destroyAllRequests(SHashObj* pRequests);
void stopAllRequests(SHashObj* pRequests); void stopAllRequests(SHashObj* pRequests);
//SAppInstInfo* getAppInstInfo(const char* clusterKey); // SAppInstInfo* getAppInstInfo(const char* clusterKey);
// conn level // conn level
int32_t hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); int32_t hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey); void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
typedef struct SSqlCallbackWrapper { typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx; SParseContext* pParseCtx;
@ -421,9 +423,9 @@ typedef struct SSqlCallbackWrapper {
void* pPlanInfo; void* pPlanInfo;
} SSqlCallbackWrapper; } SSqlCallbackWrapper;
void setQueryRequest(int64_t rId); void setQueryRequest(int64_t rId);
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper); void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
@ -431,20 +433,21 @@ void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView); int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
int32_t qnodeRequired(SRequestObj* pRequest, bool *required); int32_t qnodeRequired(SRequestObj* pRequest, bool* required);
void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code); void handleQueryAnslyseRes(SSqlCallbackWrapper* pWrapper, SMetaData* pResultMeta, int32_t code);
void restartAsyncQuery(SRequestObj *pRequest, int32_t code); void restartAsyncQuery(SRequestObj* pRequest, int32_t code);
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest); int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest);
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce); int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper** ppWrapper, SRequestObj* pRequest, bool updateMetaForce);
void returnToUser(SRequestObj* pRequest); void returnToUser(SRequestObj* pRequest);
void stopAllQueries(SRequestObj *pRequest); void stopAllQueries(SRequestObj* pRequest);
void doRequestCallback(SRequestObj* pRequest, int32_t code); void doRequestCallback(SRequestObj* pRequest, int32_t code);
void freeQueryParam(SSyncQueryParam* param); void freeQueryParam(SSyncQueryParam* param);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes); int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser,
SParseSqlRes* pRes);
#endif #endif
#define TSC_ERR_RET(c) \ #define TSC_ERR_RET(c) \
@ -474,13 +477,9 @@ int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, boo
void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost);
enum { enum { MONITORSQLTYPESELECT = 0, MONITORSQLTYPEINSERT = 1, MONITORSQLTYPEDELETE = 2 };
MONITORSQLTYPESELECT = 0,
MONITORSQLTYPEINSERT = 1,
MONITORSQLTYPEDELETE = 2
};
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type); void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
void tmqMgmtClose(void); void tmqMgmtClose(void);

View File

@ -1249,7 +1249,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
} }
} }
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) { void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
int32_t code = 0; int32_t code = 0;
if (pQuery->pRoot) { if (pQuery->pRoot) {
@ -1335,8 +1335,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
*res = pRequest->body.resInfo.execRes.res; *res = pRequest->body.resInfo.execRes.res;
pRequest->body.resInfo.execRes.res = NULL; pRequest->body.resInfo.execRes.res = NULL;
} }
return pRequest;
} }
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
@ -2934,8 +2932,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
return NULL; return NULL;
} }
code = tsem_destroy(&param->sem); code = tsem_destroy(&param->sem);
if(TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
tscError("failed to destroy semaphore since %s", tstrerror(code)); tscError("failed to destroy semaphore since %s", tstrerror(code));
} }
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;

View File

@ -394,7 +394,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
uint8_t tagNum = pCreateReq->ctb.tagNum; uint8_t tagNum = pCreateReq->ctb.tagNum;
int32_t code = 0; int32_t code = 0;
cJSON* tags = NULL; cJSON* tags = NULL;
cJSON* tableName = cJSON_CreateString(name); cJSON* tableName = cJSON_CreateString(name);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
cJSON* using = cJSON_CreateString(sname); cJSON* using = cJSON_CreateString(sname);
@ -417,7 +417,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
} }
char* pJson = NULL; char* pJson = NULL;
parseTagDatatoJson(pTag, &pJson); parseTagDatatoJson(pTag, &pJson);
if(pJson == NULL) { if (pJson == NULL) {
uError("parseTagDatatoJson failed, pJson == NULL"); uError("parseTagDatatoJson failed, pJson == NULL");
goto end; goto end;
} }
@ -731,7 +731,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
goto end; goto end;
} }
parseTagDatatoJson(vAlterTbReq.pTagVal, &buf); parseTagDatatoJson(vAlterTbReq.pTagVal, &buf);
if(buf == NULL) { if (buf == NULL) {
uError("parseTagDatatoJson failed, buf == NULL"); uError("parseTagDatatoJson failed, buf == NULL");
goto end; goto end;
} }
@ -978,7 +978,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true; pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
@ -1082,7 +1082,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true; pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
if (pRequest->code == TSDB_CODE_SUCCESS) { if (pRequest->code == TSDB_CODE_SUCCESS) {
// ignore the error code // ignore the error code
@ -1236,7 +1236,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray)); RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) { if (pRequest->code == TSDB_CODE_SUCCESS) {
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false)); RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
} }
@ -1365,7 +1365,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if (TSDB_CODE_SUCCESS != code) goto end; if (TSDB_CODE_SUCCESS != code) goto end;
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray)); RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) { if (pRequest->code == TSDB_CODE_SUCCESS) {
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false)); RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
} }
@ -1510,7 +1510,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
if (TSDB_CODE_SUCCESS != code) goto end; if (TSDB_CODE_SUCCESS != code) goto end;
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray)); RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
pVgData = NULL; pVgData = NULL;
pArray = NULL; pArray = NULL;
@ -1587,7 +1587,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0)); RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
@ -1647,7 +1647,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0)); RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
@ -1766,7 +1766,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
@ -1935,7 +1935,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:

View File

@ -112,7 +112,7 @@ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo *conn, const char
SUserAuthInfo pAuth = {0}; SUserAuthInfo pAuth = {0};
(void)snprintf(pAuth.user, sizeof(pAuth.user), "%s", info->taos->user); (void)snprintf(pAuth.user, sizeof(pAuth.user), "%s", info->taos->user);
if (NULL == pTabName) { if (NULL == pTabName) {
if (tNameSetDbName(&pAuth.tbName, info->taos->acctId, info->pRequest->pDb, strlen(info->pRequest->pDb)) != 0){ if (tNameSetDbName(&pAuth.tbName, info->taos->acctId, info->pRequest->pDb, strlen(info->pRequest->pDb)) != 0) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
} else { } else {
@ -165,7 +165,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
return convertTimePrecision(tsInt64, fromPrecision, toPrecision); return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
} }
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo** tInfo) { int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo **tInfo) {
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1); SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if (!tag) { if (!tag) {
return terrno; return terrno;
@ -203,13 +203,13 @@ static void smlDestroySTableMeta(void *para) {
taosMemoryFree(meta); taosMemoryFree(meta);
} }
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta** sMeta) { int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta **sMeta) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char *measure = currElement->measure; char *measure = currElement->measure;
int measureLen = currElement->measureLen; int measureLen = currElement->measureLen;
if (currElement->measureEscaped) { if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(measureLen); measure = (char *)taosMemoryMalloc(measureLen);
if (measure == NULL){ if (measure == NULL) {
return terrno; return terrno;
} }
(void)memcpy(measure, currElement->measure, measureLen); (void)memcpy(measure, currElement->measure, measureLen);
@ -233,7 +233,7 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml
} }
(*sMeta)->tableMeta = pTableMeta; (*sMeta)->tableMeta = pTableMeta;
code = taosHashPut(info->superTables, currElement->measure, currElement->measureLen, sMeta, POINTER_BYTES); code = taosHashPut(info->superTables, currElement->measure, currElement->measureLen, sMeta, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
smlDestroySTableMeta(*sMeta); smlDestroySTableMeta(*sMeta);
return code; return code;
} }
@ -250,11 +250,11 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml
} }
if (i < pTableMeta->tableInfo.numOfColumns) { if (i < pTableMeta->tableInfo.numOfColumns) {
if(taosArrayPush((*sMeta)->cols, &kv) == NULL){ if (taosArrayPush((*sMeta)->cols, &kv) == NULL) {
return terrno; return terrno;
} }
} else { } else {
if(taosArrayPush((*sMeta)->tags, &kv) == NULL){ if (taosArrayPush((*sMeta)->tags, &kv) == NULL) {
return terrno; return terrno;
} }
} }
@ -277,7 +277,7 @@ bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv) {
goto END; goto END;
} }
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
if (maxKV == NULL){ if (maxKV == NULL) {
goto END; goto END;
} }
if (unlikely(!IS_SAME_KEY)) { if (unlikely(!IS_SAME_KEY)) {
@ -336,9 +336,9 @@ int32_t smlJoinMeasureTag(SSmlLineInfo *elements) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool smlIsPKTable(STableMeta *pTableMeta){ static bool smlIsPKTable(STableMeta *pTableMeta) {
for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){ for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
if(pTableMeta->schema[i].flags & COL_IS_KEY){ if (pTableMeta->schema[i].flags & COL_IS_KEY) {
return true; return true;
} }
} }
@ -368,14 +368,14 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
info->maxTagKVs = sMeta->tags; info->maxTagKVs = sMeta->tags;
info->maxColKVs = sMeta->cols; info->maxColKVs = sMeta->cols;
if(smlIsPKTable(sMeta->tableMeta)){ if (smlIsPKTable(sMeta->tableMeta)) {
return TSDB_CODE_SML_NOT_SUPPORT_PK; return TSDB_CODE_SML_NOT_SUPPORT_PK;
} }
return 0; return 0;
} }
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) { int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo **oneTable = SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen); (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
SSmlTableInfo *tinfo = NULL; SSmlTableInfo *tinfo = NULL;
@ -385,19 +385,19 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
return code; return code;
} }
code = taosHashPut(info->childTables, elements->measureTag, elements->measureTagsLen, &tinfo, POINTER_BYTES); code = taosHashPut(info->childTables, elements->measureTag, elements->measureTagsLen, &tinfo, POINTER_BYTES);
if(code != 0){ if (code != 0) {
smlDestroyTableInfo(&tinfo); smlDestroyTableInfo(&tinfo);
return code; return code;
} }
tinfo->tags = taosArrayDup(info->preLineTagKV, NULL); tinfo->tags = taosArrayDup(info->preLineTagKV, NULL);
if(tinfo->tags == NULL){ if (tinfo->tags == NULL) {
smlDestroyTableInfo(&tinfo); smlDestroyTableInfo(&tinfo);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
for (size_t i = 0; i < taosArrayGetSize(info->preLineTagKV); i++) { for (size_t i = 0; i < taosArrayGetSize(info->preLineTagKV); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(info->preLineTagKV, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(info->preLineTagKV, i);
if(kv == NULL){ if (kv == NULL) {
smlDestroyTableInfo(&tinfo); smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -406,12 +406,12 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
} }
code = smlSetCTableName(tinfo, info->tbnameKey); code = smlSetCTableName(tinfo, info->tbnameKey);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
smlDestroyTableInfo(&tinfo); smlDestroyTableInfo(&tinfo);
return code; return code;
} }
code = getTableUid(info, elements, tinfo); code = getTableUid(info, elements, tinfo);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
smlDestroyTableInfo(&tinfo); smlDestroyTableInfo(&tinfo);
return code; return code;
} }
@ -458,10 +458,10 @@ int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *
return terrno; return terrno;
} }
} }
if (taosArrayPush(elements->colArray, kvTs) == NULL){ if (taosArrayPush(elements->colArray, kvTs) == NULL) {
return terrno; return terrno;
} }
if (taosArrayPush(elements->colArray, kv) == NULL){ if (taosArrayPush(elements->colArray, kv) == NULL) {
return terrno; return terrno;
} }
} }
@ -495,11 +495,11 @@ int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs)
static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) { static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) {
bool autoChildName = false; bool autoChildName = false;
size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter); size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter);
if(delimiter > 0 && tbnameKey == NULL){ if (delimiter > 0 && tbnameKey == NULL) {
size_t totalNameLen = delimiter * (taosArrayGetSize(tags) - 1); size_t totalNameLen = delimiter * (taosArrayGetSize(tags) - 1);
for (int i = 0; i < taosArrayGetSize(tags); i++) { for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i); SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){ if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
totalNameLen += tag->length; totalNameLen += tag->length;
@ -512,7 +512,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN); (void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
for (int i = 0; i < taosArrayGetSize(tags); i++) { for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i); SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){ if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
(void)strncat(childTableName, tag->value, tag->length); (void)strncat(childTableName, tag->value, tag->length);
@ -523,8 +523,8 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
if (tsSmlDot2Underline) { if (tsSmlDot2Underline) {
smlStrReplace(childTableName, strlen(childTableName)); smlStrReplace(childTableName, strlen(childTableName));
} }
}else{ } else {
if (tbnameKey == NULL){ if (tbnameKey == NULL) {
tbnameKey = tsSmlChildTableName; tbnameKey = tsSmlChildTableName;
} }
size_t childTableNameLen = strlen(tbnameKey); size_t childTableNameLen = strlen(tbnameKey);
@ -532,13 +532,14 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
for (int i = 0; i < taosArrayGetSize(tags); i++) { for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i); SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){ if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
// handle child table name // handle child table name
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tbnameKey, tag->keyLen) == 0) { if (childTableNameLen == tag->keyLen && strncmp(tag->key, tbnameKey, tag->keyLen) == 0) {
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN); (void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
(void)strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN)); (void)strncpy(childTableName, tag->value,
(tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
if (tsSmlDot2Underline) { if (tsSmlDot2Underline) {
smlStrReplace(childTableName, strlen(childTableName)); smlStrReplace(childTableName, strlen(childTableName));
} }
@ -553,7 +554,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) { int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
int32_t code = smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey); int32_t code = smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey);
if(code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -562,7 +563,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
if (dst == NULL) { if (dst == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if(oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN){ if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
uError("SML:smlSetCTableName super table name is too long"); uError("SML:smlSetCTableName super table name is too long");
taosArrayDestroy(dst); taosArrayDestroy(dst);
return TSDB_CODE_SML_INTERNAL_ERROR; return TSDB_CODE_SML_INTERNAL_ERROR;
@ -578,7 +579,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
} }
code = buildChildTableName(&rName); code = buildChildTableName(&rName);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
taosArrayDestroy(dst); taosArrayDestroy(dst);
@ -906,13 +907,13 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
return result; return result;
} }
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *checkDumplicateCols, static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
ESchemaAction *action, bool isTag) { SArray *checkDumplicateCols, ESchemaAction *action, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) { for (int j = 0; j < taosArrayGetSize(cols); ++j) {
if (j == 0 && !isTag) continue; if (j == 0 && !isTag) continue;
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
if (kv == NULL){ if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info); code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
@ -923,10 +924,10 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
for (int j = 0; j < taosArrayGetSize(checkDumplicateCols); ++j) { for (int j = 0; j < taosArrayGetSize(checkDumplicateCols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j); SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j);
if (kv == NULL){ if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if(taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL){ if (taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN; return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} }
} }
@ -934,16 +935,16 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
} }
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) { static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (hashTmp == NULL) { if (hashTmp == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
int32_t i = 0; int32_t i = 0;
for (; i < length; i++) { for (; i < length; i++) {
code = taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); code = taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
if (code != 0){ if (code != 0) {
goto END; goto END;
} }
} }
@ -955,7 +956,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
} }
for (; i < taosArrayGetSize(cols); i++) { for (; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){ if (kv == NULL) {
code = TSDB_CODE_SML_INVALID_DATA; code = TSDB_CODE_SML_INVALID_DATA;
goto END; goto END;
} }
@ -982,8 +983,8 @@ static int32_t getBytes(uint8_t type, int32_t length) {
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *results, int32_t numOfCols, bool isTag) { SArray *results, int32_t numOfCols, bool isTag) {
for (int j = 0; j < taosArrayGetSize(cols); ++j) { for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
if (kv == NULL){ if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
ESchemaAction action = SCHEMA_ACTION_NULL; ESchemaAction action = SCHEMA_ACTION_NULL;
@ -996,7 +997,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
field.type = kv->type; field.type = kv->type;
field.bytes = getBytes(kv->type, kv->length); field.bytes = getBytes(kv->type, kv->length);
(void)memcpy(field.name, kv->key, kv->keyLen); (void)memcpy(field.name, kv->key, kv->keyLen);
if (taosArrayPush(results, &field) == NULL){ if (taosArrayPush(results, &field) == NULL) {
return terrno; return terrno;
} }
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
@ -1008,7 +1009,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
uint16_t newIndex = *index; uint16_t newIndex = *index;
if (isTag) newIndex -= numOfCols; if (isTag) newIndex -= numOfCols;
SField *field = (SField *)taosArrayGet(results, newIndex); SField *field = (SField *)taosArrayGet(results, newIndex);
if (field == NULL){ if (field == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
field->bytes = getBytes(kv->type, kv->length); field->bytes = getBytes(kv->type, kv->length);
@ -1019,7 +1020,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
int32_t len = 0; int32_t len = 0;
for (int j = 0; j < taosArrayGetSize(results); ++j) { for (int j = 0; j < taosArrayGetSize(results); ++j) {
SField *field = taosArrayGet(results, j); SField *field = taosArrayGet(results, j);
if (field == NULL){ if (field == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
len += field->bytes; len += field->bytes;
@ -1051,14 +1052,14 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
} }
for (int32_t i = 0; i < pReq.numOfColumns; ++i) { for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
SField *pField = taosArrayGet(pColumns, i); SField *pField = taosArrayGet(pColumns, i);
if (pField == NULL){ if (pField == NULL) {
code = TSDB_CODE_SML_INVALID_DATA; code = TSDB_CODE_SML_INVALID_DATA;
goto end; goto end;
} }
SFieldWithOptions fieldWithOption = {0}; SFieldWithOptions fieldWithOption = {0};
setFieldWithOptions(&fieldWithOption, pField); setFieldWithOptions(&fieldWithOption, pField);
setDefaultOptionsForField(&fieldWithOption); setDefaultOptionsForField(&fieldWithOption);
if (taosArrayPush(pReq.pColumns, &fieldWithOption) == NULL){ if (taosArrayPush(pReq.pColumns, &fieldWithOption) == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
@ -1105,7 +1106,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
field.type = TSDB_DATA_TYPE_NCHAR; field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
tstrncpy(field.name, tsSmlTagName, sizeof(field.name)); tstrncpy(field.name, tsSmlTagName, sizeof(field.name));
if (taosArrayPush(pReq.pTags, &field) == NULL){ if (taosArrayPush(pReq.pTags, &field) == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
@ -1121,7 +1122,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp); pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_CREATE_STB; pCmdMsg.msgType = TDMT_MND_CREATE_STB;
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq); pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
if (pCmdMsg.msgLen < 0){ if (pCmdMsg.msgLen < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
@ -1131,7 +1132,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
goto end; goto end;
} }
if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) < 0){ if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
goto end; goto end;
@ -1144,11 +1145,11 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true; pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // no need to check return value launchQueryImpl(pRequest, &pQuery, true, NULL); // no need to check return value
if (pRequest->code == TSDB_CODE_SUCCESS) { if (pRequest->code == TSDB_CODE_SUCCESS) {
code = catalogRemoveTableMeta(info->pCatalog, pName); code = catalogRemoveTableMeta(info->pCatalog, pName);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
} }
@ -1187,7 +1188,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
size_t superTableLen = 0; size_t superTableLen = 0;
void *superTable = taosHashGetKey(tmp, &superTableLen); void *superTable = taosHashGetKey(tmp, &superTableLen);
char *measure = taosMemoryMalloc(superTableLen); char *measure = taosMemoryMalloc(superTableLen);
if (measure == NULL){ if (measure == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
@ -1246,28 +1247,28 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
} else if (code == TSDB_CODE_SUCCESS) { } else if (code == TSDB_CODE_SUCCESS) {
if (smlIsPKTable(pTableMeta)) {
if(smlIsPKTable(pTableMeta)){
code = TSDB_CODE_SML_NOT_SUPPORT_PK; code = TSDB_CODE_SML_NOT_SUPPORT_PK;
goto end; goto end;
} }
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
HASH_NO_LOCK); HASH_NO_LOCK);
if (hashTmp == NULL){ if (hashTmp == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
for (uint16_t i = pTableMeta->tableInfo.numOfColumns; for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) { i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
code = taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES); code = taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
if (code != 0){ if (code != 0) {
goto end; goto end;
} }
} }
ESchemaAction action = SCHEMA_ACTION_NULL; ESchemaAction action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true); code =
smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
@ -1280,13 +1281,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
action); action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
if (pColumns == NULL){ if (pColumns == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
SArray *pTags = SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
if (pTags == NULL){ if (pTags == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
code = terrno; code = terrno;
goto end; goto end;
@ -1297,14 +1298,14 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
field.bytes = pTableMeta->schema[i].bytes; field.bytes = pTableMeta->schema[i].bytes;
tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name)); tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name));
if (i < pTableMeta->tableInfo.numOfColumns) { if (i < pTableMeta->tableInfo.numOfColumns) {
if (taosArrayPush(pColumns, &field) == NULL){ if (taosArrayPush(pColumns, &field) == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
taosArrayDestroy(pTags); taosArrayDestroy(pTags);
code = terrno; code = terrno;
goto end; goto end;
} }
} else { } else {
if (taosArrayPush(pTags, &field) == NULL){ if (taosArrayPush(pTags, &field) == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
taosArrayDestroy(pTags); taosArrayDestroy(pTags);
code = terrno; code = terrno;
@ -1363,7 +1364,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
} }
} }
action = SCHEMA_ACTION_NULL; action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false); code =
smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
@ -1376,13 +1378,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
action); action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
if (pColumns == NULL){ if (pColumns == NULL) {
code = terrno; code = terrno;
goto end; goto end;
} }
SArray *pTags = SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
if (pTags == NULL){ if (pTags == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
code = terrno; code = terrno;
goto end; goto end;
@ -1393,14 +1395,14 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
field.bytes = pTableMeta->schema[i].bytes; field.bytes = pTableMeta->schema[i].bytes;
tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name)); tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name));
if (i < pTableMeta->tableInfo.numOfColumns) { if (i < pTableMeta->tableInfo.numOfColumns) {
if (taosArrayPush(pColumns, &field) == NULL){ if (taosArrayPush(pColumns, &field) == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
taosArrayDestroy(pTags); taosArrayDestroy(pTags);
code = terrno; code = terrno;
goto end; goto end;
} }
} else { } else {
if (taosArrayPush(pTags, &field) == NULL){ if (taosArrayPush(pTags, &field) == NULL) {
taosArrayDestroy(pColumns); taosArrayDestroy(pColumns);
taosArrayDestroy(pTags); taosArrayDestroy(pTags);
code = terrno; code = terrno;
@ -1483,7 +1485,7 @@ end:
taosHashCancelIterate(info->superTables, tmp); taosHashCancelIterate(info->superTables, tmp);
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
(void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error (void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code,
tstrerror(code), info->dataFormat, info->needModifySchema); tstrerror(code), info->dataFormat, info->needModifySchema);
@ -1494,34 +1496,35 @@ static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
terrno = 0; terrno = 0;
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){ if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES); int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
if (ret == 0) { if (ret == 0) {
if (taosArrayPush(metaArray, kv) == NULL){ if (taosArrayPush(metaArray, kv) == NULL) {
return terrno; return terrno;
} }
if(taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) { if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN; return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} }
}else if(terrno == TSDB_CODE_DUP_KEY){ } else if (terrno == TSDB_CODE_DUP_KEY) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN; return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg, SHashObj* checkDuplicate) { static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg,
SHashObj *checkDuplicate) {
for (int i = 0; i < taosArrayGetSize(cols); ++i) { for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){ if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen); int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
if (index) { if (index) {
SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index); SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
if (value == NULL){ if (value == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -1549,13 +1552,13 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
int16_t size = tmp; int16_t size = tmp;
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
if (ret == 0) { if (ret == 0) {
if(taosArrayPush(metaArray, kv) == NULL){ if (taosArrayPush(metaArray, kv) == NULL) {
return terrno; return terrno;
} }
if(taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) { if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN; return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} }
}else{ } else {
return ret; return ret;
} }
} }
@ -1586,7 +1589,7 @@ void freeSSmlKv(void *data) {
void smlDestroyInfo(SSmlHandle *info) { void smlDestroyInfo(SSmlHandle *info) {
if (!info) return; if (!info) return;
// qDestroyQuery(info->pQuery); // qDestroyQuery(info->pQuery);
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables); taosHashCleanup(info->childTables);
@ -1657,7 +1660,7 @@ int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
info->id = smlGenId(); info->id = smlGenId();
code = smlInitHandle(&info->pQuery); code = smlInitHandle(&info->pQuery);
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
goto FAILED; goto FAILED;
} }
info->dataFormat = true; info->dataFormat = true;
@ -1688,7 +1691,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
} }
for (size_t i = 0; i < taosArrayGetSize(cols); i++) { for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){ if (kv == NULL) {
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -1698,7 +1701,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
return TSDB_CODE_PAR_DUPLICATED_COLUMN; return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} }
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
return code; return code;
} }
@ -1759,9 +1762,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
if (tableMeta) { // update meta if (tableMeta) { // update meta
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum); info->lineNum);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf, (*tableMeta)->tagHash); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf,
(*tableMeta)->tagHash);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf, (*tableMeta)->colHash); ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf,
(*tableMeta)->colHash);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed, ret:%d", info->id, ret); uError("SML:0x%" PRIx64 " smlUpdateMeta failed, ret:%d", info->id, ret);
@ -1801,17 +1806,17 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (info->pRequest->dbList == NULL) { if (info->pRequest->dbList == NULL) {
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
if (info->pRequest->dbList == NULL){ if (info->pRequest->dbList == NULL) {
return terrno; return terrno;
} }
} }
char *data = (char *)taosArrayReserve(info->pRequest->dbList, 1); char *data = (char *)taosArrayReserve(info->pRequest->dbList, 1);
if (data == NULL){ if (data == NULL) {
return terrno; return terrno;
} }
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
(void)tNameGetFullDbName(&pName, data); //ignore (void)tNameGetFullDbName(&pName, data); // ignore
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) { while (oneTable) {
@ -1819,7 +1824,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
int measureLen = tableData->sTableNameLen; int measureLen = tableData->sTableNameLen;
char *measure = (char *)taosMemoryMalloc(tableData->sTableNameLen); char *measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
if (measure == NULL){ if (measure == NULL) {
return terrno; return terrno;
} }
(void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen); (void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
@ -1830,11 +1835,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (info->pRequest->tableList == NULL) { if (info->pRequest->tableList == NULL) {
info->pRequest->tableList = taosArrayInit(1, sizeof(SName)); info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
if (info->pRequest->tableList == NULL){ if (info->pRequest->tableList == NULL) {
return terrno; return terrno;
} }
} }
if (taosArrayPush(info->pRequest->tableList, &pName) == NULL){ if (taosArrayPush(info->pRequest->tableList, &pName) == NULL) {
return terrno; return terrno;
} }
@ -1862,7 +1867,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
return code; return code;
} }
code = taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); code = taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " taosHashPut failed. table name: %s", info->id, tableData->childTableName); uError("SML:0x%" PRIx64 " taosHashPut failed. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure); taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable); taosHashCancelIterate(info->childTables, oneTable);
@ -1904,9 +1909,9 @@ static int32_t smlInsertData(SSmlHandle *info) {
info->cost.insertRpcTime = taosGetTimestampUs(); info->cost.insertRpcTime = taosGetTimestampUs();
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary; SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); // no need to check return code (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); // no need to check return code
(void)launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code,
tstrerror(info->pRequest->code)); tstrerror(info->pRequest->code));
@ -1975,12 +1980,12 @@ static bool getLine(SSmlHandle *info, char *lines[], char **rawLine, char *rawLi
if (*rawLine != NULL && (uDebugFlag & DEBUG_DEBUG)) { if (*rawLine != NULL && (uDebugFlag & DEBUG_DEBUG)) {
char *print = taosMemoryCalloc(*len + 1, 1); char *print = taosMemoryCalloc(*len + 1, 1);
if (print != NULL){ if (print != NULL) {
(void)memcpy(print, *tmp, *len); (void)memcpy(print, *tmp, *len);
uDebug("SML:0x%" PRIx64 " smlParseLine is raw, numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines, uDebug("SML:0x%" PRIx64 " smlParseLine is raw, numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines,
info->protocol, *len, print); info->protocol, *len, print);
taosMemoryFree(print); taosMemoryFree(print);
} else{ } else {
uError("SML:0x%" PRIx64 " smlParseLine taosMemoryCalloc failed", info->id); uError("SML:0x%" PRIx64 " smlParseLine taosMemoryCalloc failed", info->id);
} }
} else { } else {
@ -2228,7 +2233,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code,
tstrerror(code)); tstrerror(code));
code = refreshMeta(request->pTscObj, request); code = refreshMeta(request->pTscObj, request);
if (code != 0){ if (code != 0) {
uInfo("SML:%" PRIx64 " refresh meta error code:%d, msg:%s", info->id, code, tstrerror(code)); uInfo("SML:%" PRIx64 " refresh meta error code:%d, msg:%s", info->id, code, tstrerror(code));
} }
smlDestroyInfo(info); smlDestroyInfo(info);
@ -2266,7 +2271,7 @@ end:
*/ */
TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines, int protocol, TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines, int protocol,
int precision, int32_t ttl, int64_t reqid, char *tbnameKey){ int precision, int32_t ttl, int64_t reqid, char *tbnameKey) {
return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid, tbnameKey); return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid, tbnameKey);
} }
@ -2306,14 +2311,17 @@ static void getRawLineLen(char *lines, int len, int32_t *totalRows, int protocol
} }
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows, TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows,
int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey){ int protocol, int precision, int32_t ttl, int64_t reqid,
char *tbnameKey) {
getRawLineLen(lines, len, totalRows, protocol); getRawLineLen(lines, len, totalRows, protocol);
return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid, tbnameKey); return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid,
tbnameKey);
} }
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision, int32_t ttl, int64_t reqid) { int precision, int32_t ttl, int64_t reqid) {
return taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(taos, lines, len, totalRows, protocol, precision, ttl, reqid, NULL); return taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(taos, lines, len, totalRows, protocol, precision, ttl,
reqid, NULL);
} }
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,

View File

@ -25,7 +25,7 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
return terrno; return terrno;
} }
if(taosArrayPush(pTblBuf->pBufList, &buff) == NULL){ if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
return terrno; return terrno;
} }
@ -224,8 +224,8 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
bool autoCreateTbl) { bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(tbName, tbFName); int32_t code = tNameExtractFullName(tbName, tbFName);
if (code != 0){ if (code != 0) {
return code; return code;
} }
@ -772,7 +772,7 @@ void* stmtBindThreadFunc(void* param) {
} }
int ret = stmtAsyncOutput(pStmt, asyncParam); int ret = stmtAsyncOutput(pStmt, asyncParam);
if (ret != 0){ if (ret != 0) {
qError("stmtAsyncOutput failed, reason:%s", tstrerror(ret)); qError("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
} }
} }
@ -821,7 +821,7 @@ int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
return terrno; return terrno;
} }
if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL){ if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
return terrno; return terrno;
} }
@ -967,7 +967,7 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
} }
int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) { int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) {
STscStmt *pStmt = (STscStmt *) stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_DLOG("start to set dbName: %s", dbName); STMT_DLOG("start to set dbName: %s", dbName);
@ -1045,7 +1045,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags; SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) { if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
tscWarn("no tags bound in sql, will not bound tags"); tscWarn("no tags bound in sql, will not bound tags");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1192,7 +1192,7 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray**
return terrno; return terrno;
} }
if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL){ if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
return terrno; return terrno;
} }
} }
@ -1216,7 +1216,6 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1256,7 +1255,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
if (pStmt->sql.pQuery->haveResultSet) { if (pStmt->sql.pQuery->haveResultSet) {
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema, STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols)); pStmt->sql.pQuery->numOfResCols));
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema); taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision); setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
} }
@ -1549,7 +1548,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else { } else {
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -1571,7 +1570,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
} }
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} }
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {

View File

@ -1645,7 +1645,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
__taos_async_fn_t fp = pStmt->options.asyncExecFn; __taos_async_fn_t fp = pStmt->options.asyncExecFn;
if (!fp) { if (!fp) {
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);

View File

@ -63,13 +63,10 @@ int32_t s3Begin() {
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
void s3End() { (void)S3_deinitialize(); } void s3End() { S3_deinitialize(); }
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
void s3CleanUp() { /*s3End();*/
}
static int32_t s3ListBucket(char const *bucketname); static int32_t s3ListBucket(char const *bucketname);
static void s3DumpCfgByEp(int8_t epIndex) { static void s3DumpCfgByEp(int8_t epIndex) {
@ -506,7 +503,9 @@ S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
} }
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) { S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
(void)responsePropertiesCallbackNull(properties, callbackData); if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
uError("%s failed at line %d to process null callback.", __func__, __LINE__);
}
MultipartPartData *data = (MultipartPartData *)callbackData; MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq; int seq = data->seq;
@ -517,7 +516,9 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti
} }
S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) { S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) {
(void)responsePropertiesCallbackNull(properties, callbackData); if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
uError("%s failed at line %d to process null callback.", __func__, __LINE__);
}
MultipartPartData *data = (MultipartPartData *)callbackData; MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq; int seq = data->seq;
@ -950,7 +951,9 @@ _exit:
} }
if (cp.thefile) { if (cp.thefile) {
(void)cos_cp_close(cp.thefile); if (cos_cp_close(cp.thefile)) {
uError("%s failed at line %d to close cp file.", __func__, lino);
}
} }
if (cp.parts) { if (cp.parts) {
taosMemoryFree(cp.parts); taosMemoryFree(cp.parts);
@ -1290,7 +1293,10 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) {
void s3DeleteObjectsByPrefix(const char *prefix) { void s3DeleteObjectsByPrefix(const char *prefix) {
SArray *objectArray = getListByPrefix(prefix); SArray *objectArray = getListByPrefix(prefix);
if (objectArray == NULL) return; if (objectArray == NULL) return;
(void)s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray)); int32_t code = s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
if (!code) {
uError("%s failed at line %d since %s.", __func__, __LINE__, tstrerror(code));
}
taosArrayDestroyEx(objectArray, s3FreeObjectKey); taosArrayDestroyEx(objectArray, s3FreeObjectKey);
} }
@ -1537,7 +1543,7 @@ int32_t s3Init() {
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
void s3CleanUp() { cos_http_io_deinitialize(); } // void s3CleanUp() { cos_http_io_deinitialize(); }
static void log_status(cos_status_t *s) { static void log_status(cos_status_t *s) {
cos_warn_log("status->code: %d", s->code); cos_warn_log("status->code: %d", s->code);
@ -1961,7 +1967,6 @@ long s3Size(const char *object_name) {
#else #else
int32_t s3Init() { return 0; } int32_t s3Init() { return 0; }
void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { return 0; } int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { return 0; }

View File

@ -309,7 +309,7 @@ int32_t cos_cp_dump(SCheckpoint* cp) {
if (!item) { if (!item) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
} }
cJSON_AddItemToArray(ajson, item); if (!cJSON_AddItemToArray(ajson, item)) goto _exit;
if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) { if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);

View File

@ -43,7 +43,6 @@
// } // }
// strcpy(tsSnodeAddress, "127.0.0.1"); // strcpy(tsSnodeAddress, "127.0.0.1");
// int ret = RUN_ALL_TESTS(); // int ret = RUN_ALL_TESTS();
// s3CleanUp();
// return ret; // return ret;
// } // }