Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TS-4331-3.0

This commit is contained in:
Hongze Cheng 2024-07-08 14:32:07 +08:00
commit 9edfc82bf4
29 changed files with 305 additions and 224 deletions

View File

@ -134,7 +134,6 @@ extern uint16_t tsMonitorPort;
extern int32_t tsMonitorMaxLogs; extern int32_t tsMonitorMaxLogs;
extern bool tsMonitorComp; extern bool tsMonitorComp;
extern bool tsMonitorLogProtocol; extern bool tsMonitorLogProtocol;
extern int32_t tsMonitorIntervalForBasic;
extern bool tsMonitorForceV2; extern bool tsMonitorForceV2;
// audit // audit

View File

@ -38,6 +38,13 @@ typedef enum {
SLOW_LOG_READ_QUIT = 3, SLOW_LOG_READ_QUIT = 3,
} SLOW_LOG_QUEUE_TYPE; } SLOW_LOG_QUEUE_TYPE;
static char* queueTypeStr[] = {
"SLOW_LOG_WRITE",
"SLOW_LOG_READ_RUNNING",
"SLOW_LOG_READ_BEGINNIG",
"SLOW_LOG_READ_QUIT"
};
#define SLOW_LOG_SEND_SIZE_MAX 1024*1024 #define SLOW_LOG_SEND_SIZE_MAX 1024*1024
typedef struct { typedef struct {
@ -65,7 +72,7 @@ typedef struct {
} MonitorSlowLogData; } MonitorSlowLogData;
void monitorClose(); void monitorClose();
void monitorInit(); int32_t monitorInit();
void monitorClientSQLReqInit(int64_t clusterKey); void monitorClientSQLReqInit(int64_t clusterKey);
void monitorClientSlowQueryInit(int64_t clusterId); void monitorClientSlowQueryInit(int64_t clusterId);

View File

@ -226,7 +226,6 @@ void monSetQmInfo(SMonQmInfo *pInfo);
void monSetSmInfo(SMonSmInfo *pInfo); void monSetSmInfo(SMonSmInfo *pInfo);
void monSetBmInfo(SMonBmInfo *pInfo); void monSetBmInfo(SMonBmInfo *pInfo);
void monGenAndSendReport(); void monGenAndSendReport();
void monGenAndSendReportBasic();
void monSendContent(char *pCont, const char* uri); void monSendContent(char *pCont, const char* uri);
void tFreeSMonMmInfo(SMonMmInfo *pInfo); void tFreeSMonMmInfo(SMonMmInfo *pInfo);

View File

@ -89,6 +89,7 @@ typedef struct SParseContext {
bool isView; bool isView;
bool isAudit; bool isAudit;
bool nodeOffline; bool nodeOffline;
bool isStmtBind;
const char* svrVer; const char* svrVer;
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos

View File

@ -283,6 +283,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
bool isSubReq; bool isSubReq;
bool inCallback; bool inCallback;
bool isStmtBind; // is statement bind parameter
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId; int64_t allocatorRefId;

View File

@ -864,10 +864,15 @@ void taos_init_imp(void) {
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
if (taosConvInit() != 0) { if (taosConvInit() != 0) {
tscInitRes = -1;
tscError("failed to init conv"); tscError("failed to init conv");
return; return;
} }
if (monitorInit() != 0){
tscInitRes = -1;
tscError("failed to init monitor");
return;
}
rpcInit(); rpcInit();
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
@ -891,7 +896,6 @@ void taos_init_imp(void) {
taosThreadMutexInit(&appInfo.mutex, NULL); taosThreadMutexInit(&appInfo.mutex, NULL);
tscCrashReportInit(); tscCrashReportInit();
monitorInit();
tscDebug("client is initialized successfully"); tscDebug("client is initialized successfully");
} }

View File

@ -206,6 +206,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql; (*pRequest)->validateOnly = validateSql;
(*pRequest)->isStmtBind = false;
((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param; ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
@ -266,7 +267,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.isStmtBind = pRequest->isStmtBind};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);

View File

@ -18,6 +18,7 @@ int32_t quitCnt = 0;
tsem2_t monitorSem; tsem2_t monitorSem;
STaosQueue* monitorQueue; STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash; SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0};
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
if (tsTempDir == NULL) { if (tsTempDir == NULL) {
@ -453,6 +454,10 @@ static int64_t getFileSize(char* path){
} }
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
if (data == NULL){
taosMemoryFree(fileName);
return -1;
}
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if(pParam == NULL){ if(pParam == NULL){
taosMemoryFree(data); taosMemoryFree(data);
@ -468,18 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
} }
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL){
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
return -1;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size);
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep);
}
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){
int64_t size = getFileSize(*fileName); int64_t size = getFileSize(*fileName);
if(size <= offset){ if(size <= offset){
processFileInTheEnd(pFile, *fileName); processFileInTheEnd(pFile, *fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
}else{ }else{
char* data = readFile(pFile, &offset, size); int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
if(data != NULL){ tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); *fileName = NULL;
*fileName = NULL;
}
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data);
} }
} }
@ -500,17 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){
tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
pClient->offset = 0; pClient->offset = 0;
}else{ }else{
SAppInstInfo* pInst = getAppInstByClusterId(clusterId); int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
if(pInst == NULL){ tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pClient->pFile, &pClient->offset, size);
if(data != NULL){
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
}
tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data);
} }
} }
@ -532,16 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
return true; return true;
} }
}else{ }else{
SAppInstInfo* pInst = getAppInstByClusterId(clusterId); int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
if(pInst == NULL) { tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
return true;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pClient->pFile, &pClient->offset, size);
if(data != NULL){
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
}
tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data);
} }
return false; return false;
} }
@ -558,16 +554,11 @@ static void monitorSendAllSlowLogAtQuit(){
pClient->pFile = NULL; pClient->pFile = NULL;
}else if(pClient->offset == 0){ }else if(pClient->offset == 0){
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
if(pInst == NULL) { tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
continue; if (code == 0){
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pClient->pFile, &pClient->offset, size);
if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
quitCnt ++; quitCnt ++;
} }
tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data);
} }
} }
} }
@ -613,12 +604,8 @@ static void monitorSendAllSlowLog(){
} }
continue; continue;
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
char* data = readFile(pClient->pFile, &pClient->offset, size); tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
if(data != NULL){
sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
}
tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data);
} }
} }
} }
@ -631,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
return; return;
} }
char namePrefix[PATH_MAX] = {0}; char namePrefix[PATH_MAX] = {0};
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
tscError("failed to generate slow log file name prefix"); tscError("failed to generate slow log file name prefix");
return; return;
} }
@ -656,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
if (strcmp(name, ".") == 0 || if (strcmp(name, ".") == 0 ||
strcmp(name, "..") == 0 || strcmp(name, "..") == 0 ||
strstr(name, namePrefix) == NULL) { strstr(name, namePrefix) == NULL) {
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId);
continue; continue;
} }
@ -672,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
taosCloseFile(&pFile); taosCloseFile(&pFile);
continue; continue;
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char *tmp = taosStrdup(filename); char *tmp = taosStrdup(filename);
monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
taosMemoryFree(tmp); taosMemoryFree(tmp);
} }
@ -690,28 +676,6 @@ static void* monitorThreadFunc(void *param){
} }
#endif #endif
char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){
return NULL;
}
if (taosMulModeMkDir(tmpPath, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
printf("failed to create dir:%s since %s", tmpPath, terrstr());
return NULL;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
tscError("sem init error since %s", terrstr());
return NULL;
}
monitorQueue = taosOpenQueue();
if(monitorQueue == NULL){
tscError("open queue error since %s", terrstr());
return NULL;
}
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL; return NULL;
} }
@ -738,16 +702,12 @@ static void* monitorThreadFunc(void *param){
if (slowLogData != NULL) { if (slowLogData != NULL) {
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){
if(slowLogData->pFile != NULL){ if(slowLogData->pFile != NULL){
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset);
if(pInst != NULL) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep);
}
}else{ }else{
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
} }
} else if(slowLogData->type == SLOW_LOG_WRITE){ } else if(slowLogData->type == SLOW_LOG_WRITE){
monitorWriteSlowLog2File(slowLogData, tmpPath); monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
} else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){
monitorSendSlowLogAtRunning(slowLogData->clusterId); monitorSendSlowLogAtRunning(slowLogData->clusterId);
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){ } else if(slowLogData->type == SLOW_LOG_READ_QUIT){
@ -799,27 +759,59 @@ static void tscMonitorStop() {
} }
} }
void monitorInit() { int32_t monitorInit() {
tscInfo("[monitor] tscMonitor init"); tscInfo("[monitor] tscMonitor init");
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) { if (monitorCounterHash == NULL) {
tscError("failed to create monitorCounterHash"); tscError("failed to create monitorCounterHash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) { if (monitorSlowLogHash == NULL) {
tscError("failed to create monitorSlowLogHash"); tscError("failed to create monitorSlowLogHash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) { if (monitorTimer == NULL) {
tscError("failed to create monitor timer"); tscError("failed to create monitor timer");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return -1;
}
if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
return -1;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("sem init error since %s", terrstr());
return -1;
}
monitorQueue = taosOpenQueue();
if(monitorQueue == NULL){
tscError("open queue error since %s", terrstr());
return -1;
} }
taosInitRWLatch(&monitorLock); taosInitRWLatch(&monitorLock);
tscMonitortInit(); if (tscMonitortInit() != 0){
return -1;
}
return 0;
} }
void monitorClose() { void monitorClose() {
@ -844,10 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
return -1; return -1;
} }
*slowLogData = data; *slowLogData = data;
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data);
while (atomic_load_32(&slowLogFlag) == -1) {
taosMsleep(5);
}
if (taosWriteQitem(monitorQueue, slowLogData) == 0){ if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem); tsem2_post(&monitorSem);
}else{ }else{

View File

@ -154,13 +154,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){ if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
}else{
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
} }
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
} }
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);

View File

@ -72,6 +72,7 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true; pStmt->exec.pRequest->syncQuery = true;
pStmt->exec.pRequest->isStmtBind = true;
} }
} }
@ -830,6 +831,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->bInfo.needParse = true; pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT; pStmt->sql.status = STMT_INIT;
pStmt->reqid = reqid; pStmt->reqid = reqid;
pStmt->errCode = TSDB_CODE_SUCCESS;
if (NULL != pOptions) { if (NULL != pOptions) {
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
@ -882,6 +884,10 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_DLOG_E("start to prepare"); STMT_DLOG_E("start to prepare");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (pStmt->sql.status >= STMT_PREPARE) { if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt)); STMT_ERR_RET(stmtResetStmt(pStmt));
} }
@ -953,6 +959,10 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_DLOG("start to set tbName: %s", tbName); STMT_DLOG("start to set tbName: %s", tbName);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
int32_t insert = 0; int32_t insert = 0;
@ -999,8 +1009,18 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_DLOG_E("start to set tbTags"); STMT_DLOG_E("start to set tbTags");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
tscWarn("no tags bound in sql, will not bound tags");
return TSDB_CODE_SUCCESS;
}
if (pStmt->bInfo.inExecCache) { if (pStmt->bInfo.inExecCache) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1021,6 +1041,10 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
} }
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds"); tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1039,6 +1063,10 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
} }
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds"); tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1150,8 +1178,13 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx); STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1307,6 +1340,10 @@ int stmtAddBatch(TAOS_STMT* stmt) {
STMT_DLOG_E("start to add batch"); STMT_DLOG_E("start to add batch");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
@ -1471,6 +1508,10 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_DLOG_E("start to exec"); STMT_DLOG_E("start to exec");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
@ -1599,6 +1640,10 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get tag fields"); STMT_DLOG_E("start to get tag fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
@ -1637,6 +1682,10 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get col fields"); STMT_DLOG_E("start to get col fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
@ -1674,6 +1723,10 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STMT_DLOG_E("start to get param num"); STMT_DLOG_E("start to get param num");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
@ -1706,6 +1759,10 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STMT_DLOG_E("start to get param"); STMT_DLOG_E("start to get param");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }

View File

@ -111,7 +111,6 @@ uint16_t tsMonitorPort = 6043;
int32_t tsMonitorMaxLogs = 100; int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false; bool tsMonitorComp = false;
bool tsMonitorLogProtocol = false; bool tsMonitorLogProtocol = false;
int32_t tsMonitorIntervalForBasic = 30;
bool tsMonitorForceV2 = true; bool tsMonitorForceV2 = true;
// audit // audit
@ -712,7 +711,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -1165,7 +1163,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32; tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32;
tsMonitorLogProtocol = cfgGetItem(pCfg, "monitorLogProtocol")->bval; tsMonitorLogProtocol = cfgGetItem(pCfg, "monitorLogProtocol")->bval;
tsMonitorIntervalForBasic = cfgGetItem(pCfg, "monitorIntervalForBasic")->i32;
tsMonitorForceV2 = cfgGetItem(pCfg, "monitorForceV2")->i32; tsMonitorForceV2 = cfgGetItem(pCfg, "monitorForceV2")->i32;
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;

View File

@ -43,7 +43,6 @@ typedef struct SDnodeMgmt {
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq; int32_t statusSeq;
SendMonitorReportFp sendMonitorReportFpBasic;
} SDnodeMgmt; } SDnodeMgmt;
// dmHandle.c // dmHandle.c

View File

@ -65,7 +65,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
pMgmt->sendMonitorReportFpBasic = pInput->sendMonitorReportFpBasic;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;

View File

@ -175,15 +175,6 @@ static void *dmMonitorThreadFp(void *param) {
taosMemoryTrim(0); taosMemoryTrim(0);
} }
} }
if(tsMonitorForceV2){
if (curTime < lastTimeForBasic) lastTimeForBasic = curTime;
float intervalForBasic = (curTime - lastTimeForBasic) / 1000.0f;
if (intervalForBasic >= tsMonitorIntervalForBasic) {
(*pMgmt->sendMonitorReportFpBasic)();
lastTimeForBasic = curTime;
}
}
} }
return NULL; return NULL;

View File

@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
return -1; return -1;
} }
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;

View File

@ -128,7 +128,6 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c // dmMonitor.c
void dmSendMonitorReport(); void dmSendMonitorReport();
void dmSendAuditRecords(); void dmSendAuditRecords();
void dmSendMonitorReportBasic();
void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo);

View File

@ -394,7 +394,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processDropNodeFp = dmProcessDropNodeReq, .processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport, .sendMonitorReportFp = dmSendMonitorReport,
.sendAuditRecordFp = auditSendRecordsInBatch, .sendAuditRecordFp = auditSendRecordsInBatch,
.sendMonitorReportFpBasic = dmSendMonitorReportBasic,
.getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads,

View File

@ -123,16 +123,6 @@ void dmSendMonitorReport() {
monGenAndSendReport(); monGenAndSendReport();
} }
void dmSendMonitorReportBasic() {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SDnode *pDnode = dmInstance();
dmGetDmMonitorInfoBasic(pDnode);
dmGetMmMonitorInfo(pDnode);
monGenAndSendReportBasic();
}
//Todo: put this in seperate file in the future //Todo: put this in seperate file in the future
void dmSendAuditRecords() { void dmSendAuditRecords() {
auditSendRecordsInBatch(); auditSendRecordsInBatch();

View File

@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
pRpc->info.wrapper = pWrapper; pRpc->info.wrapper = pWrapper;
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed
pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
if (pMsg == NULL) goto _OVER; if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));

View File

@ -155,7 +155,6 @@ typedef struct {
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordFp; SendAuditRecordsFp sendAuditRecordFp;
SendMonitorReportFp sendMonitorReportFpBasic;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;

View File

@ -568,6 +568,25 @@ void monSendReport(SMonInfo *pMonitor){
} }
} }
void monSendReportBasic(SMonInfo *pMonitor) {
char *pCont = tjsonToString(pMonitor->pJson);
if (tsMonitorLogProtocol) {
if (pCont != NULL) {
uInfoL("report cont basic:\n%s", pCont);
} else {
uInfo("report cont basic is null");
}
}
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) !=
0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
}
}
void monGenAndSendReport() { void monGenAndSendReport() {
SMonInfo *pMonitor = monCreateMonitorInfo(); SMonInfo *pMonitor = monCreateMonitorInfo();
if (pMonitor == NULL) return; if (pMonitor == NULL) return;
@ -595,38 +614,11 @@ void monGenAndSendReport() {
monGenVnodeRoleTable(pMonitor); monGenVnodeRoleTable(pMonitor);
monSendPromReport(); monSendPromReport();
} if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) {
monGenBasicJsonBasic(pMonitor);
monCleanupMonitorInfo(pMonitor); monGenClusterJsonBasic(pMonitor);
} monSendReportBasic(pMonitor);
void monSendReportBasic(SMonInfo *pMonitor){
char *pCont = tjsonToString(pMonitor->pJson);
if(tsMonitorLogProtocol){
if(pCont != NULL){
uInfoL("report cont basic:\n%s", pCont);
} }
else{
uInfo("report cont basic is null");
}
}
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
}
}
void monGenAndSendReportBasic() {
SMonInfo *pMonitor = monCreateMonitorInfo();
monGenBasicJsonBasic(pMonitor);
monGenClusterJsonBasic(pMonitor);
if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) {
monSendReportBasic(pMonitor);
} }
monCleanupMonitorInfo(pMonitor); monCleanupMonitorInfo(pMonitor);

View File

@ -30,6 +30,7 @@ typedef struct SInsertParseContext {
bool forceUpdate; bool forceUpdate;
bool needTableTagVal; bool needTableTagVal;
bool needRequest; // whether or not request server bool needRequest; // whether or not request server
bool isStmtBind; // whether is stmt bind
} SInsertParseContext; } SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -1978,7 +1979,6 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) { SToken* pToken) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo; SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1996,7 +1996,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]); SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
if (pToken->type == TK_NK_QUESTION) { if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true; pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) { if (NULL == pCxt->pComCxt->pStmtCb) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
break; break;
@ -2007,8 +2007,8 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
break; break;
} }
if (isParseBindParam) { if (pCxt->isStmtBind) {
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); code = buildInvalidOperationMsg(&pCxt->msg, "stmt bind param does not support normal value in sql");
break; break;
} }
@ -2025,7 +2025,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
} }
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -2035,7 +2035,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
} }
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
*pGotRow = true; *pGotRow = true;
} }
@ -2410,6 +2410,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
} }
if (TK_NK_QUESTION == pTbName->type) { if (TK_NK_QUESTION == pTbName->type) {
pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) { if (NULL == pCxt->pComCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z); return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
} }
@ -2443,6 +2444,13 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
pTbName->n = strlen(tbName); pTbName->n = strlen(tbName);
} }
if (pCxt->isStmtBind) {
if (TK_NK_ID == pTbName->type || (tbNameAfterDbName != NULL && *(tbNameAfterDbName + 1) != '?')) {
// In SQL statements, the table name has already been specified.
parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param", pCxt->pComCxt->requestId);
}
}
*pHasData = true; *pHasData = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2936,7 +2944,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.missCache = false, .missCache = false,
.usingDuplicateTable = false, .usingDuplicateTable = false,
.needRequest = true, .needRequest = true,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false),
.isStmtBind = pCxt->isStmtBind};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -29,6 +29,12 @@ typedef struct {
uint64_t nice; uint64_t nice;
uint64_t system; uint64_t system;
uint64_t idle; uint64_t idle;
uint64_t wa;
uint64_t hi;
uint64_t si;
uint64_t st;
uint64_t guest;
uint64_t guest_nice;
} SysCpuInfo; } SysCpuInfo;
typedef struct { typedef struct {
@ -173,8 +179,11 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
} }
char cpu[10] = {0}; char cpu[10] = {0};
sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, sscanf(line,
&cpuInfo->idle); "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64
" %" PRIu64,
cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, &cpuInfo->idle, &cpuInfo->wa, &cpuInfo->hi,
&cpuInfo->si, &cpuInfo->st, &cpuInfo->guest, &cpuInfo->guest_nice);
taosCloseFile(&pFile); taosCloseFile(&pFile);
#endif #endif
@ -576,7 +585,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
SysCpuInfo sysCpu = {0}; SysCpuInfo sysCpu = {0};
ProcCpuInfo procCpu = {0}; ProcCpuInfo procCpu = {0};
if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) { if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) {
curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system; curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system + sysCpu.wa + sysCpu.hi + sysCpu.si + sysCpu.st +
sysCpu.guest + sysCpu.guest_nice;
curSysTotal = curSysUsed + sysCpu.idle; curSysTotal = curSysUsed + sysCpu.idle;
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;

View File

@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) { if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryUsed); tsRpcQueueMemoryAllowed);
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode); taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qall->start = queue->head; qall->start = queue->head;
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
qall->memOfItems = queue->memOfItems; qall->memOfItems = queue->memOfItems;
qall->unAccessedNumOfItems = queue->numOfItems;
qall->unAccessMemOfItems = queue->memOfItems;
code = qall->numOfItems; code = qall->numOfItems;
qinfo->ahandle = queue->ahandle; qinfo->ahandle = queue->ahandle;

View File

@ -28,8 +28,7 @@ from frame import *
class TDTestCase(TBase): class TDTestCase(TBase):
# fix
# fix
def FIX_TD_30686(self): def FIX_TD_30686(self):
tdLog.info("check bug TD_30686 ...\n") tdLog.info("check bug TD_30686 ...\n")
sqls = [ sqls = [
@ -49,6 +48,32 @@ class TDTestCase(TBase):
] ]
tdSql.checkDataMem(sql, results) tdSql.checkDataMem(sql, results)
def FIX_TS_5105(self):
tdLog.info("check bug TS_5105 ...\n")
ts1 = "2024-07-03 10:00:00.000"
ts2 = "2024-07-03 13:00:00.000"
sqls = [
"drop database if exists ts_5105",
"create database ts_5105 cachemodel 'both';",
"use ts_5105;",
"CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);",
"CREATE TABLE d1001 USING meters TAGS ('California.B', 2);",
"CREATE TABLE d1002 USING meters TAGS ('California.S', 3);",
f"INSERT INTO d1001 VALUES ('{ts1}', 10);",
f"INSERT INTO d1002 VALUES ('{ts2}', 13);",
]
tdSql.executes(sqls)
sql = "select last(ts), last_row(ts) from meters;"
# 执行多次有些时候last_row(ts)会返回错误的值详见TS-5105
for i in range(1, 10):
tdLog.debug(f"{i}th execute sql: {sql}")
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, ts2)
tdSql.checkData(0, 1, ts2)
# run # run
def run(self): def run(self):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
@ -57,11 +82,10 @@ class TDTestCase(TBase):
self.FIX_TD_30686() self.FIX_TD_30686()
# TS BUGS # TS BUGS
self.FIX_TS_5105()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
sqls = [
"drop database if exists ts_5101",
"create database ts_5101 cachemodel 'both';",
"use ts_5101;",
"CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);",
"CREATE TABLE d1001 USING meters TAGS ('California.B', 2);",
"CREATE TABLE d1002 USING meters TAGS ('California.S', 3);",
"INSERT INTO d1001 VALUES ('2024-07-03 10:00:00.000', 10);",
"INSERT INTO d1002 VALUES ('2024-07-03 13:00:00.000', 13);",
]
tdSql.executes(sqls)
# 执行多次有些时候last_row(ts)会返回错误的值详见TS-5105
for i in range(1, 10):
sql = "select last(ts), last_row(ts) from meters;"
tdLog.debug(f"{i}th execute sql: {sql}")
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, "2024-07-03 13:00:00.000")
tdSql.checkData(0, 1, "2024-07-03 13:00:00.000")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -20,7 +20,6 @@
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py ,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py
,,y,army,./pytest.sh python3 ./test.py -f query/query_last_row_repeatly.py
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py ,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py

View File

@ -24,7 +24,7 @@ class TDTestCase:
case1 <wenzhouwww>: [TD-11899] : this is an test case for check stmt error use . case1 <wenzhouwww>: [TD-11899] : this is an test case for check stmt error use .
''' '''
return return
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
@ -49,7 +49,7 @@ class TDTestCase:
ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)", ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)",
) )
conn.load_table_info("log") conn.load_table_info("log")
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16) params = new_bind_params(16)
@ -123,7 +123,7 @@ class TDTestCase:
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )", ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )",
) )
conn.load_table_info("log") conn.load_table_info("log")
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1000)") stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1000)")
params = new_bind_params(16) params = new_bind_params(16)
@ -195,20 +195,74 @@ class TDTestCase:
except Exception as err: except Exception as err:
conn.close() conn.close()
raise err raise err
def test_stmt_nornmal_value_error(self, conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt_error"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )",
)
conn.load_table_info("log")
stmt = conn.statement("insert into log values(NOW(),?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].bool(True)
params[2].tinyint(None)
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.bind_param(params)
stmt.execute()
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def run(self): def run(self):
self.test_stmt_insert(self.conn()) self.test_stmt_insert(self.conn())
try: try:
self.test_stmt_insert_error(self.conn()) self.test_stmt_insert_error(self.conn())
except Exception as error : except Exception as error :
if str(error)=='[0x0200]: no mix usage for ? and values': if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============') tdLog.info('=========stmt error occured for bind part column ==============')
else: else:
tdLog.exit("expect error(%s) not occured" % str(error)) tdLog.exit("expect error(%s) not occured" % str(error))
try: try:
self.test_stmt_nornmal_value_error(self.conn())
except Exception as error :
if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============')
else:
tdLog.exit("expect error(%s) not occured" % str(error))
try:
self.test_stmt_insert_error_null_timestamp(self.conn()) self.test_stmt_insert_error_null_timestamp(self.conn())
tdLog.exit("expect error not occured - 1") tdLog.exit("expect error not occured - 1")
except Exception as error : except Exception as error :

View File

@ -30,7 +30,7 @@
#include "taos.h" #include "taos.h"
class taoscTest : public ::testing::Test { class taoscTest : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
printf("start test setup.\n"); printf("start test setup.\n");
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -188,7 +188,7 @@ TEST_F(taoscTest, taos_query_test) {
void queryCallback2(void* param, void* res, int32_t code) { void queryCallback2(void* param, void* res, int32_t code) {
ASSERT_TRUE(code == 0); ASSERT_TRUE(code == 0);
ASSERT_TRUE(param == pUserParam); ASSERT_TRUE(param == pUserParam);
// After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking. // After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking.
// Reason: schProcessOnCbBegin SCH_LOCK_TASK(pTask) // Reason: schProcessOnCbBegin SCH_LOCK_TASK(pTask)
TAOS_ROW row; TAOS_ROW row;
row = taos_fetch_row(res); row = taos_fetch_row(res);
@ -254,7 +254,7 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
printf("taos_query_a_fetch_row taos_fetch_row start...\n"); printf("taos_query_a_fetch_row taos_fetch_row start...\n");
while ((row = taos_fetch_row(*pres))) { while ((row = taos_fetch_row(*pres))) {
getRecordCounts++; getRecordCounts++;
} }
printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts); printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts);
taos_free_result(*pres); taos_free_result(*pres);
@ -264,4 +264,3 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
printf("taos_query_a_fetch_row test finished.\n"); printf("taos_query_a_fetch_row test finished.\n");
} }