Merge pull request #26380 from taosdata/fix/TS-4921

fix:[TS-4921]refactor reporting logic for slow log
This commit is contained in:
dapan1121 2024-07-04 14:45:13 +08:00 committed by GitHub
commit 9493ad96b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 649 additions and 380 deletions

View File

@ -25,13 +25,20 @@ extern "C" {
#include "query.h"
#include "tqueue.h"
typedef enum SQL_RESULT_CODE {
typedef enum {
SQL_RESULT_SUCCESS = 0,
SQL_RESULT_FAILED = 1,
SQL_RESULT_CANCEL = 2,
} SQL_RESULT_CODE;
#define SLOW_LOG_SEND_SIZE 32*1024
typedef enum {
SLOW_LOG_WRITE = 0,
SLOW_LOG_READ_RUNNING = 1,
SLOW_LOG_READ_BEGINNIG = 2,
SLOW_LOG_READ_QUIT = 3,
} SLOW_LOG_QUEUE_TYPE;
#define SLOW_LOG_SEND_SIZE_MAX 1024*1024
typedef struct {
int64_t clusterId;
@ -43,12 +50,18 @@ typedef struct {
typedef struct {
TdFilePtr pFile;
void* timer;
int64_t lastCheckTime;
char path[PATH_MAX];
int64_t offset;
} SlowLogClient;
typedef struct {
int64_t clusterId;
char *value;
SLOW_LOG_QUEUE_TYPE type;
char* data;
int64_t offset;
TdFilePtr pFile;
char* fileName;
} MonitorSlowLogData;
void monitorClose();
@ -60,7 +73,7 @@ void monitorCreateClient(int64_t clusterId);
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys);
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
const char* monitorResultStr(SQL_RESULT_CODE code);
int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value);
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data);
#ifdef __cplusplus
}
#endif

View File

@ -157,7 +157,11 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
}
char* value = cJSON_PrintUnformatted(json);
if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_WRITE;
data.data = value;
if(monitorPutData2MonitorQueue(data) < 0){
taosMemoryFree(value);
}
@ -202,9 +206,10 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
if ((pRequest->pQuery && pRequest->pQuery->pRoot &&
QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
@ -220,7 +225,6 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
reqType = SLOW_LOG_TYPE_QUERY;
}
}
nodesSimReleaseAllocator(pRequest->allocatorRefId);
}

View File

@ -19,6 +19,7 @@
#include "scheduler.h"
#include "trpc.h"
#include "tglobal.h"
#include "clientMonitor.h"
typedef struct {
union {
@ -546,10 +547,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
@ -560,6 +557,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
return -1;
}
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));

File diff suppressed because it is too large Load Diff

View File

@ -155,7 +155,10 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
}
monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL);
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
}

View File

@ -1380,7 +1380,7 @@ void freeSSmlKv(void *data) {
void smlDestroyInfo(SSmlHandle *info) {
if (!info) return;
qDestroyQuery(info->pQuery);
// qDestroyQuery(info->pQuery);
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables);
@ -1912,6 +1912,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
return (TAOS_RES *)request;
}
info->pRequest = request;
info->pRequest->pQuery = info->pQuery;
info->ttl = ttl;
info->precision = precision;
info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;

View File

@ -64,6 +64,65 @@ int main(int argc, char** argv) {
// clusterMonitorClose(cluster2);
//}
static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
uError("failed to seek file:%p code: %d", pFile, errno);
return NULL;
}
ASSERT(size > *offset);
char* pCont = NULL;
int64_t totalSize = 0;
if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
pCont = (char*)taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX);
totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX;
}else{
pCont = (char*)taosMemoryCalloc(1, 2 * (size - *offset));
totalSize = 2 * (size - *offset);
}
if(pCont == NULL){
uError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
return NULL;
}
char* buf = pCont;
strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) {
if (readSize < 0){
uError("failed to read len from file:%p since %s", pFile, terrstr());
}
taosMemoryFree(pCont);
return NULL;
}
totalSize = 0;
while(1){
size_t len = strlen(buf);
totalSize += (len+1);
if (totalSize > readSize || len == 0) {
*(buf-1) = ']';
*buf = '\0';
break;
}
buf[len] = ','; // replace '\0' with ','
buf += (len + 1);
*offset += (len+1);
}
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont);
return pCont;
}
static int64_t getFileSize(char* path){
int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return -1;
}
return fileSize;
}
TEST(clientMonitorTest, sendTest) {
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
ASSERT_TRUE(taos != NULL);
@ -91,8 +150,8 @@ TEST(clientMonitorTest, ReadOneFile) {
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
const int size = 10;
for(int i = 0; i < batch; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
@ -106,14 +165,22 @@ TEST(clientMonitorTest, ReadOneFile) {
// Create an SEpSet object and set it up for testing
SEpSet* epSet = NULL;
int64_t fileSize = getFileSize("./tdengine-1-wewe");
// Call the function to be tested
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
char value[size] = {0};
memset(value, '0', size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
int64_t offset = 0;
while(1){
if (offset >= fileSize) {
break;
}
char* val = readFile(pFile, &offset, fileSize);
printf("offset:%lld,fileSize:%lld,val:%s\n", offset, fileSize, val);
}
// char value[size] = {0};
// memset(value, '0', size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
@ -121,49 +188,49 @@ TEST(clientMonitorTest, ReadOneFile) {
taosCloseFile(&pFile);
}
TEST(clientMonitorTest, ReadTwoFile) {
// Create a TdFilePtr object and set it up for testing
TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
SAppInstInfo pAppInfo = {0};
pAppInfo.clusterId = 2;
pAppInfo.monitorParas.tsEnableMonitor = 1;
strcpy(tsTempDir,"/tmp");
// monitorSendAllSlowLogFromTempDir(&pAppInfo);
}
//TEST(clientMonitorTest, ReadTwoFile) {
// // Create a TdFilePtr object and set it up for testing
//
// TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
// if (pFile == NULL) {
// uError("failed to open file:./test.txt since %s", terrstr());
// return;
// }
//
// const int batch = 10;
// const int size = SLOW_LOG_SEND_SIZE/batch;
// for(int i = 0; i < batch + 1; i++){
// char value[size] = {0};
// memset(value, '0' + i, size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// }
//
// taosFsyncFile(pFile);
// taosCloseFile(&pFile);
//
// pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
// if (pFile == NULL) {
// uError("failed to open file:./test.txt since %s", terrstr());
// return;
// }
//
// for(int i = 0; i < batch + 1; i++){
// char value[size] = {0};
// memset(value, '0' + i, size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// }
//
// taosFsyncFile(pFile);
// taosCloseFile(&pFile);
//
// SAppInstInfo pAppInfo = {0};
// pAppInfo.clusterId = 2;
// pAppInfo.monitorParas.tsEnableMonitor = 1;
// strcpy(tsTempDir,"/tmp");
//// monitorSendAllSlowLogFromTempDir(&pAppInfo);
//
//}

View File

@ -1065,6 +1065,7 @@ int sml_escape1_Test() {
for(int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++){
pRes = taos_schemaless_insert(taos, (char**)&sql[i], 1, TSDB_SML_LINE_PROTOCOL, 0);
int code = taos_errno(pRes);
taos_free_result(pRes);
ASSERT(code);
}