Merge pull request #26791 from taosdata/opti/TD-31097

fix:[TD-31097]process return value in client
This commit is contained in:
dapan1121 2024-07-29 09:34:49 +08:00 committed by GitHub
commit 7ccc2d1139
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 548 additions and 423 deletions

View File

@ -555,7 +555,6 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
(*pRequest)->pDb = getDbOfConnection(pTscObj); (*pRequest)->pDb = getDbOfConnection(pTscObj);
(*pRequest)->pTscObj = pTscObj; (*pRequest)->pTscObj = pTscObj;
(*pRequest)->inCallback = false; (*pRequest)->inCallback = false;
(*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
if (NULL == (*pRequest)->msgBuf) { if (NULL == (*pRequest)->msgBuf) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -332,7 +332,6 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int64_t transporterId = 0; int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem); (void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -21,13 +21,10 @@ SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0}; char tmpSlowLogPath[PATH_MAX] = {0};
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
if (tsTempDir == NULL) {
return -1;
}
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
if (ret < 0) { if (ret < 0) {
tscError("failed to get tmp path ret:%d", ret); tscError("failed to get tmp path ret:%d", ret);
return ret; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
return 0; return 0;
} }
@ -71,10 +68,9 @@ static void destroyMonitorClient(void* data) {
if (pMonitor == NULL) { if (pMonitor == NULL) {
return; return;
} }
taosTmrStopA(&pMonitor->timer); (void)taosTmrStopA(&pMonitor->timer);
taosHashCleanup(pMonitor->counters); taosHashCleanup(pMonitor->counters);
taos_collector_registry_destroy(pMonitor->registry); (void)taos_collector_registry_destroy(pMonitor->registry);
// taos_collector_destroy(pMonitor->colector);
taosMemoryFree(pMonitor); taosMemoryFree(pMonitor);
} }
@ -142,15 +138,17 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
tscError("sendReport failed, out of memory, len:%d", tlen); tscError("sendReport failed, out of memory, len:%d", tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAILED; goto FAILED;
} }
tSerializeSStatisReq(buf, tlen, &sStatisReq); tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
if (tlen < 0) {
taosMemoryFree(buf);
goto FAILED;
}
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
tscError("sendReport failed, out of memory send info"); tscError("sendReport failed, out of memory send info");
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(buf); taosMemoryFree(buf);
goto FAILED; goto FAILED;
} }
@ -168,12 +166,12 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
FAILED: FAILED:
monitorFreeSlowLogDataEx(param); monitorFreeSlowLogDataEx(param);
return -1; return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
} }
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) { static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
char ts[50] = {0}; char ts[50] = {0};
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); (void)sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
if (NULL == pCont) { if (NULL == pCont) {
tscError("generateClusterReport failed, get null content."); tscError("generateClusterReport failed, get null content.");
@ -181,7 +179,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
} }
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
taos_collector_registry_clear_batch(registry); (void)taos_collector_registry_clear_batch(registry);
} }
taosMemoryFreeClear(pCont); taosMemoryFreeClear(pCont);
} }
@ -202,7 +200,7 @@ static void reportSendProcess(void* param, void* tmrId) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); (void)taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
taosRUnLockLatch(&monitorLock); taosRUnLockLatch(&monitorLock);
} }
@ -250,14 +248,13 @@ void monitorCreateClient(int64_t clusterId) {
goto fail; goto fail;
} }
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); (void)taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
pMonitor->counters = pMonitor->counters =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pMonitor->counters == NULL) { if (pMonitor->counters == NULL) {
tscError("failed to create monitor counters"); tscError("failed to create monitor counters");
goto fail; goto fail;
} }
// taosHashSetFreeFp(pMonitor->counters, destroyCounter);
if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) { if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
tscError("failed to put monitor client to hash"); tscError("failed to put monitor client to hash");
@ -300,10 +297,14 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return; if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor; MonitorClient* pMonitor = *ppMonitor;
taos_collector_add_metric(pMonitor->colector, newCounter); if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){
tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter);
goto end;
}
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor"); tscError("failed to put counter to monitor");
taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
goto end; goto end;
} }
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
@ -332,7 +333,10 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
goto end; goto end;
} }
taos_counter_inc(*ppCounter, label_values); if (taos_counter_inc(*ppCounter, label_values) != 0){
tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end: end:
@ -360,24 +364,23 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
tscInfo("[monitor] create slow log file:%s", path); tscInfo("[monitor] create slow log file:%s", path);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); tscError("failed to open file:%s since %d", path, errno);
tscError("failed to open file:%s since %s", path, terrstr());
return; return;
} }
SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
if (pClient == NULL) { if (pClient == NULL) {
tscError("failed to allocate memory for slow log client"); tscError("failed to allocate memory for slow log client");
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
return; return;
} }
pClient->lastCheckTime = taosGetMonoTimestampMs(); pClient->lastCheckTime = taosGetMonoTimestampMs();
strcpy(pClient->path, path); (void)strcpy(pClient->path, path);
pClient->offset = 0; pClient->offset = 0;
pClient->pFile = pFile; pClient->pFile = pFile;
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
taosMemoryFree(pClient); taosMemoryFree(pClient);
return; return;
} }
@ -423,7 +426,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
return NULL; return NULL;
} }
char* buf = pCont; char* buf = pCont;
strcat(buf++, "["); (void)strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) { if (readSize <= 0) {
if (readSize < 0) { if (readSize < 0) {
@ -454,7 +457,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
static int64_t getFileSize(char* path) { static int64_t getFileSize(char* path) {
int64_t fileSize = 0; int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return -1; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
return fileSize; return fileSize;
@ -464,13 +467,13 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
char* fileName, void* pTransporter, SEpSet* epSet) { char* fileName, void* pTransporter, SEpSet* epSet) {
if (data == NULL) { if (data == NULL) {
taosMemoryFree(fileName); taosMemoryFree(fileName);
return -1; return TSDB_CODE_INVALID_PARA;
} }
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if (pParam == NULL) { if (pParam == NULL) {
taosMemoryFree(data); taosMemoryFree(data);
taosMemoryFree(fileName); taosMemoryFree(fileName);
return -1; return terrno;
} }
pParam->data = data; pParam->data = data;
pParam->offset = offset; pParam->offset = offset;
@ -486,7 +489,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
SAppInstInfo* pInst = getAppInstByClusterId(clusterId); SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if (pInst == NULL) { if (pInst == NULL) {
tscError("failed to get app instance by clusterId:%" PRId64, clusterId); tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
return -1; return terrno;
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size); char* data = readFile(pFile, offset, size);
@ -495,13 +498,20 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
} }
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL){
return;
}
int64_t size = getFileSize(*fileName); int64_t size = getFileSize(*fileName);
if (size <= offset) { if (size <= offset) {
processFileInTheEnd(pFile, *fileName); processFileInTheEnd(pFile, *fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
} else { } else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); if (code == 0){
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
}else{
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code);
}
*fileName = NULL; *fileName = NULL;
} }
} }
@ -509,10 +519,12 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
static void monitorSendSlowLogAtRunning(int64_t clusterId) { static void monitorSendSlowLogAtRunning(int64_t clusterId) {
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL) { if (tmp == NULL) {
tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
return; return;
} }
SlowLogClient* pClient = (*(SlowLogClient**)tmp); SlowLogClient* pClient = (*(SlowLogClient**)tmp);
if (pClient == NULL) { if (pClient == NULL) {
tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
return; return;
} }
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
@ -574,14 +586,16 @@ static void monitorSendAllSlowLogAtQuit() {
} }
static void processFileRemoved(SlowLogClient* pClient) { static void processFileRemoved(SlowLogClient* pClient) {
taosUnLockFile(pClient->pFile); if (taosUnLockFile(pClient->pFile) != 0) {
taosCloseFile(&(pClient->pFile)); tscError("failed to unlock file:%s since %d", pClient->path, errno);
return;
}
(void)taosCloseFile(&(pClient->pFile));
TdFilePtr pFile = TdFilePtr pFile =
taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); tscError("failed to open file:%s since %d", pClient->path, errno);
tscError("failed to open file:%s since %s", pClient->path, terrstr());
} else { } else {
pClient->pFile = pFile; pClient->pFile = pFile;
} }
@ -594,7 +608,7 @@ static void monitorSendAllSlowLog() {
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
SlowLogClient* pClient = (*(SlowLogClient**)pIter); SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if (pClient == NULL) { if (pClient == NULL || pInst == NULL) {
taosHashCancelIterate(monitorSlowLogHash, pIter); taosHashCancelIterate(monitorSlowLogHash, pIter);
return; return;
} }
@ -604,7 +618,7 @@ static void monitorSendAllSlowLog() {
continue; continue;
} }
if (pInst != NULL && pClient->offset == 0) { if (pClient->offset == 0) {
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
if (size <= 0) { if (size <= 0) {
if (size < 0) { if (size < 0) {
@ -657,7 +671,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
} }
char filename[PATH_MAX] = {0}; char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) { if (pFile == NULL) {
tscError("failed to open file:%s since %s", filename, terrstr()); tscError("failed to open file:%s since %s", filename, terrstr());
@ -665,7 +679,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
} }
if (taosLockFile(pFile) < 0) { if (taosLockFile(pFile) < 0) {
tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
continue; continue;
} }
char* tmp = taosStrdup(filename); char* tmp = taosStrdup(filename);
@ -673,7 +687,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
taosMemoryFree(tmp); taosMemoryFree(tmp);
} }
taosCloseDir(&pDir); (void)taosCloseDir(&pDir);
} }
static void* monitorThreadFunc(void* param) { static void* monitorThreadFunc(void* param) {
@ -707,7 +721,7 @@ static void* monitorThreadFunc(void* param) {
} }
MonitorSlowLogData* slowLogData = NULL; MonitorSlowLogData* slowLogData = NULL;
taosReadQitem(monitorQueue, (void**)&slowLogData); (void)taosReadQitem(monitorQueue, (void**)&slowLogData);
if (slowLogData != NULL) { if (slowLogData != NULL) {
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) {
if (slowLogData->pFile != NULL) { if (slowLogData->pFile != NULL) {
@ -735,7 +749,7 @@ static void* monitorThreadFunc(void* param) {
if (quitCnt == 0) { if (quitCnt == 0) {
monitorSendAllSlowLog(); monitorSendAllSlowLog();
} }
tsem2_timewait(&monitorSem, 100); (void)tsem2_timewait(&monitorSem, 100);
} }
atomic_store_32(&slowLogFlag, -2); atomic_store_32(&slowLogFlag, -2);
return NULL; return NULL;
@ -743,15 +757,22 @@ static void* monitorThreadFunc(void* param) {
static int32_t tscMonitortInit() { static int32_t tscMonitortInit() {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); if (taosThreadAttrInit(&thAttr) != 0) {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); tscError("failed to init thread attr since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
tscError("failed to set thread attr since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
TdThread monitorThread; TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
tscError("failed to create monitor thread since %s", strerror(errno)); tscError("failed to create monitor thread since %s", strerror(errno));
return -1; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
return 0; return 0;
} }
@ -767,15 +788,14 @@ static void tscMonitorStop() {
} }
int32_t monitorInit() { int32_t monitorInit() {
int32_t code; int32_t code = 0;
tscInfo("[monitor] tscMonitor init"); tscInfo("[monitor] tscMonitor init");
monitorCounterHash = monitorCounterHash =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) { if (monitorCounterHash == NULL) {
tscError("failed to create monitorCounterHash"); tscError("failed to create monitorCounterHash");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
@ -783,46 +803,39 @@ int32_t monitorInit() {
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) { if (monitorSlowLogHash == NULL) {
tscError("failed to create monitorSlowLogHash"); tscError("failed to create monitorSlowLogHash");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) { if (monitorTimer == NULL) {
tscError("failed to create monitor timer"); tscError("failed to create monitor timer");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0) { code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
terrno = TSDB_CODE_TSC_INTERNAL_ERROR; if (code != 0) {
return -1; return code;
} }
if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) { if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) {
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr()); tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
return terrno; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
} }
if (tsem2_init(&monitorSem, 0, 0) != 0) { if (tsem2_init(&monitorSem, 0, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("sem init error since %s", terrstr()); tscError("sem init error since %s", terrstr());
return -1; return TAOS_SYSTEM_ERROR(errno);
} }
code = taosOpenQueue(&monitorQueue); code = taosOpenQueue(&monitorQueue);
if (code) { if (code) {
terrno = code;
tscError("open queue error since %s", terrstr()); tscError("open queue error since %s", terrstr());
return -1; return TAOS_GET_TERRNO(code);
} }
taosInitRWLatch(&monitorLock); taosInitRWLatch(&monitorLock);
if (tscMonitortInit() != 0) { return tscMonitortInit();
return -1;
}
return 0;
} }
void monitorClose() { void monitorClose() {
@ -838,13 +851,13 @@ void monitorClose() {
taosHashCleanup(monitorSlowLogHash); taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer); taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue); taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem); (void)tsem2_destroy(&monitorSem);
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
int32_t code; int32_t code = 0;
MonitorSlowLogData* slowLogData; MonitorSlowLogData* slowLogData = NULL;
if (atomic_load_32(&slowLogFlag) == -2) { if (atomic_load_32(&slowLogFlag) == -2) {
tscError("[monitor] slow log thread is exiting"); tscError("[monitor] slow log thread is exiting");
@ -854,13 +867,13 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData); code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
if (code) { if (code) {
tscError("[monitor] failed to allocate slow log data"); tscError("[monitor] failed to allocate slow log data");
return terrno = code; return code;
} }
*slowLogData = data; *slowLogData = data;
tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
queueTypeStr[slowLogData->type], slowLogData->data); queueTypeStr[slowLogData->type], slowLogData->data);
if (taosWriteQitem(monitorQueue, slowLogData) == 0) { if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
tsem2_post(&monitorSem); (void)tsem2_post(&monitorSem);
} else { } else {
monitorFreeSlowLogData(slowLogData); monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData); taosFreeQitem(slowLogData);

View File

@ -69,7 +69,7 @@ void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) {
} else { } else {
clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost); clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost);
} }
releaseTscObj(rid); (void)releaseTscObj(rid);
} else { } else {
tscLog("slowQueryLog, not found rid"); tscLog("slowQueryLog, not found rid");
} }

View File

@ -65,7 +65,7 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) {
} else { } else {
clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type); clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type);
} }
releaseTscObj(rid); (void)releaseTscObj(rid);
} else { } else {
tscLog("sqlReqLog, not found rid"); tscLog("sqlReqLog, not found rid");
} }

View File

@ -40,7 +40,9 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)); if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0){
tscError("failed to remove meta data for table");
}
} }
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
@ -48,7 +50,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -61,7 +63,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -70,7 +72,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (NULL == pTscObj->pAppInfo) { if (NULL == pTscObj->pAppInfo) {
code = TSDB_CODE_TSC_DISCONNECTED; code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -78,14 +80,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) { if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
code = TSDB_CODE_TSC_INVALID_VERSION; code = TSDB_CODE_TSC_INVALID_VERSION;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) { if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer); tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -95,14 +97,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = TSDB_CODE_TIME_UNSYNCED; code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta); tscError("time diff:%ds is too big", delta);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -111,8 +113,10 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet dstEpSet = connectRsp.epSet; SEpSet dstEpSet = connectRsp.epSet;
if (srcEpSet.numOfEps == 1) { if (srcEpSet.numOfEps == 1) {
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn); dstEpSet.eps[dstEpSet.inUse].fqdn) != 0){
tscError("failed to set default addr for rpc");
}
updateEpSet = 0; updateEpSet = 0;
} }
} }
@ -158,33 +162,35 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
MonitorSlowLogData data = {0}; MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId; data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG; data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data); (void)monitorPutData2MonitorQueue(data); // ignore
monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId);
} }
} }
taosThreadMutexLock(&clientHbMgr.lock); (void)taosThreadMutexLock(&clientHbMgr.lock);
SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
if (pAppHbMgr) { if (pAppHbMgr) {
hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0){
tscError("0x%" PRIx64 " failed to register conn to hbMgr", pRequest->requestId);
}
} else { } else {
taosThreadMutexUnlock(&clientHbMgr.lock); (void)taosThreadMutexUnlock(&clientHbMgr.lock);
code = TSDB_CODE_TSC_DISCONNECTED; code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
goto End; goto End;
} }
taosThreadMutexUnlock(&clientHbMgr.lock); (void)taosThreadMutexUnlock(&clientHbMgr.lock);
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
End: End:
if (pRequest) { if (pRequest) {
releaseRequest(pRequest->self); (void)releaseRequest(pRequest->self);
} }
taosMemoryFree(param); taosMemoryFree(param);
@ -216,7 +222,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
@ -225,28 +231,33 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0){
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); tscError("0x%" PRIx64 " failed to refresh db vg info", pRequest->requestId);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); }
(void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0){
tscError("0x%" PRIx64 " failed to refresh db vg info", pRequest->requestId);
}
} }
} }
if (pRequest->body.queryFp) { if (pRequest->body.queryFp) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code) { TSDB_CODE_MND_DB_IN_DROPPING == code) {
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0){
tscError("0x%" PRIx64 " deserialize SUseDbRsp failed", pRequest->requestId);
}
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { // cached in local if (usedbRsp.vgVersion >= 0) { // cached in local
@ -256,10 +267,12 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
tstrerror(code1)); tstrerror(code1));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0){
tscError("0x%" PRIx64 "catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
usedbRsp.uid);
}
} }
} }
tFreeSUsedbRsp(&usedbRsp); tFreeSUsedbRsp(&usedbRsp);
} }
@ -272,14 +285,16 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, pRequest->code); doRequestCallback(pRequest, pRequest->code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0){
tscError("0x%" PRIx64 " deserialize SUseDbRsp failed", pRequest->requestId);
}
if (strlen(usedbRsp.db) == 0) { if (strlen(usedbRsp.db) == 0) {
if (usedbRsp.errCode != 0) { if (usedbRsp.errCode != 0) {
@ -292,6 +307,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum); tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
for (int32_t i = 0; i < usedbRsp.vgNum; ++i) { for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i); SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
if (pInfo == NULL){
continue;
}
tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse); tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) { for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port); tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
@ -299,11 +317,12 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
SName name = {0}; SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); if(tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
tscError("0x%" PRIx64 " failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
}
SUseDbOutput output = {0}; SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp); code = queryBuildUseDbOutput(&output, &usedbRsp);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
@ -317,26 +336,31 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1)); tstrerror(code1));
} else { } else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0){
tscError("0x%" PRIx64 " failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
output.dbId);
}
output.dbVgroup = NULL; output.dbVgroup = NULL;
} }
} }
taosMemoryFreeClear(output.dbVgroup); taosMemoryFreeClear(output.dbVgroup);
tFreeSUsedbRsp(&usedbRsp); tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db); if(tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
tscError("0x%" PRIx64 " failed to get db name since %s", pRequest->requestId, tstrerror(code));
}
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, pRequest->code); doRequestCallback(pRequest, pRequest->code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return 0; return 0;
} }
@ -353,7 +377,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMCreateStbRsp createRsp = {0}; SMCreateStbRsp createRsp = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp); (void)tDecodeSMCreateStbRsp(&coder, &createRsp); // pMsg->len == 0
tDecoderClear(&coder); tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB; pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
@ -380,7 +404,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -391,33 +415,41 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
SDropDbRsp dropdbRsp = {0}; SDropDbRsp dropdbRsp = {0};
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0){
tscError("0x%" PRIx64 " deserialize SDropDbRsp failed", pRequest->requestId);
}
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0){
tscError("0x%" PRIx64 " failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
}
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); tscError("0x%" PRIx64 " failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); }
(void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
tscError("0x%" PRIx64 " failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
}
} }
} }
END:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -430,7 +462,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMAlterStbRsp alterRsp = {0}; SMAlterStbRsp alterRsp = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMAlterStbRsp(&coder, &alterRsp); (void)tDecodeSMAlterStbRsp(&coder, &alterRsp); // pMsg->len = 0
tDecoderClear(&coder); tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
@ -457,57 +489,72 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) { static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
int32_t code = 0;
int32_t line = 0;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
int32_t numOfCfg = taosArrayGetSize(pVars); int32_t numOfCfg = taosArrayGetSize(pVars);
blockDataEnsureCapacity(pBlock, numOfCfg); code = blockDataEnsureCapacity(pBlock, numOfCfg);
TSDB_CHECK_CODE(code, line, END);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SVariablesInfo* pInfo = taosArrayGet(pVars, i); SVariablesInfo* pInfo = taosArrayGet(pVars, i);
TSDB_CHECK_NULL(pInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, name, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, name, false);
TSDB_CHECK_CODE(code, line, END);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, value, false);
TSDB_CHECK_CODE(code, line, END);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, scope, false);
TSDB_CHECK_CODE(code, line, END);
} }
pBlock->info.rows = numOfCfg; pBlock->info.rows = numOfCfg;
*block = pBlock; *block = pBlock;
return code;
return TSDB_CODE_SUCCESS; END:
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock);
return code;
} }
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
@ -577,55 +624,72 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) { static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
int32_t code = 0;
int32_t line = 0;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN; infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_INT; infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN; infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
blockDataEnsureCapacity(pBlock, 1); code = blockDataEnsureCapacity(pBlock, 1);
TSDB_CHECK_CODE(code, line, END);
SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
TSDB_CHECK_NULL(pResultCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
TSDB_CHECK_NULL(pIdCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2); SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
TSDB_CHECK_NULL(pReasonCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0}; char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0}; char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
if (pRsp->bAccepted) { if (pRsp->bAccepted) {
STR_TO_VARSTR(result, "accepted"); STR_TO_VARSTR(result, "accepted");
colDataSetVal(pResultCol, 0, result, false); code = colDataSetVal(pResultCol, 0, result, false);
colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false); TSDB_CHECK_CODE(code, line, END);
code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
TSDB_CHECK_CODE(code, line, END);
STR_TO_VARSTR(reason, "success"); STR_TO_VARSTR(reason, "success");
colDataSetVal(pReasonCol, 0, reason, false); code = colDataSetVal(pReasonCol, 0, reason, false);
TSDB_CHECK_CODE(code, line, END);
} else { } else {
STR_TO_VARSTR(result, "rejected"); STR_TO_VARSTR(result, "rejected");
colDataSetVal(pResultCol, 0, result, false); code = colDataSetVal(pResultCol, 0, result, false);
TSDB_CHECK_CODE(code, line, END);
colDataSetNULL(pIdCol, 0); colDataSetNULL(pIdCol, 0);
STR_TO_VARSTR(reason, "compaction is ongoing"); STR_TO_VARSTR(reason, "compaction is ongoing");
colDataSetVal(pReasonCol, 0, reason, false); code = colDataSetVal(pReasonCol, 0, reason, false);
TSDB_CHECK_CODE(code, line, END);
} }
pBlock->info.rows = 1; pBlock->info.rows = 1;
*block = pBlock; *block = pBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
END:
taosMemoryFree(pBlock);
taosArrayDestroy(pBlock->pDataBlock);
return code;
} }
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) { static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
@ -696,7 +760,7 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code); pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem_post(&pRequest->body.rspSem);
} }
return code; return code;
} }

View File

@ -14,15 +14,20 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
pTblBuf->buffOffset += pTblBuf->buffUnit; pTblBuf->buffOffset += pTblBuf->buffUnit;
} else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) { } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++); pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
if (NULL == pTblBuf->pCurBuff) {
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
}
*pBuf = pTblBuf->pCurBuff; *pBuf = pTblBuf->pCurBuff;
pTblBuf->buffOffset = pTblBuf->buffUnit; pTblBuf->buffOffset = pTblBuf->buffUnit;
} else { } else {
void* buff = taosMemoryMalloc(pTblBuf->buffSize); void* buff = taosMemoryMalloc(pTblBuf->buffSize);
if (NULL == buff) { if (NULL == buff) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
taosArrayPush(pTblBuf->pBufList, &buff); if(taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
return terrno;
}
pTblBuf->buffIdx++; pTblBuf->buffIdx++;
pTblBuf->pCurBuff = buff; pTblBuf->pCurBuff = buff;
@ -48,7 +53,7 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
*param = node; *param = node;
atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); (void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1);
return true; return true;
} }
@ -58,7 +63,7 @@ void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
pStmt->queue.tail = param; pStmt->queue.tail = param;
pStmt->stat.bindDataNum++; pStmt->stat.bindDataNum++;
atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
} }
static int32_t stmtCreateRequest(STscStmt* pStmt) { static int32_t stmtCreateRequest(STscStmt* pStmt) {
@ -183,8 +188,8 @@ int32_t stmtBackupQueryFields(STscStmt* pStmt) {
if (NULL == pRes->fields || NULL == pRes->userFields) { if (NULL == pRes->fields || NULL == pRes->userFields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -201,7 +206,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
} }
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
@ -209,7 +214,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -219,10 +224,13 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
bool autoCreateTbl) { bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(tbName, tbFName); int32_t code = tNameExtractFullName(tbName, tbFName);
if (code != 0){
return code;
}
memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); (void)strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0; pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid; pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
@ -383,17 +391,21 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
taosMemoryFreeClear(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags);
} }
pStmt->bInfo.stbFName[0] = 0; pStmt->bInfo.stbFName[0] = 0;
;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void stmtFreeTableBlkList(STableColsData* pTb) { void stmtFreeTableBlkList(STableColsData* pTb) {
qResetStmtColumns(pTb->aCol, true); (void)qResetStmtColumns(pTb->aCol, true);
taosArrayDestroy(pTb->aCol); taosArrayDestroy(pTb->aCol);
} }
void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
if (NULL == pTblBuf->pCurBuff) {
tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
return;
}
pTblBuf->buffIdx = 1; pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head); pTblBuf->buffOffset = sizeof(*pQueue->head);
@ -438,7 +450,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
} }
qDestroyStmtDataBlock(pBlocks); qDestroyStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
} }
@ -505,7 +517,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx); qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols); taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
memset(&pStmt->sql, 0, sizeof(pStmt->sql)); (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
pStmt->sql.siInfo.tableColsReady = true; pStmt->sql.siInfo.tableColsReady = true;
STMT_DLOG_E("end to free SQL info"); STMT_DLOG_E("end to free SQL info");
@ -628,7 +640,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
tscDebug("tb %s not exist", pStmt->bInfo.tbFName); tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
stmtCleanBindInfo(pStmt); STMT_ERR_RET(stmtCleanBindInfo(pStmt));
STMT_ERR_RET(code); STMT_ERR_RET(code);
} }
@ -726,6 +738,9 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) { for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i); SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
*p = taosArrayInit(20, POINTER_BYTES); *p = taosArrayInit(20, POINTER_BYTES);
if (*p == NULL) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
} }
atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true); atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
@ -735,7 +750,7 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
// taosMemoryFree(pParam->pTbData); // taosMemoryFree(pParam->pTbData);
atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -757,7 +772,7 @@ void* stmtBindThreadFunc(void* param) {
continue; continue;
} }
stmtAsyncOutput(pStmt, asyncParam); (void)stmtAsyncOutput(pStmt, asyncParam);
} }
qInfo("stmt bind thread stopped"); qInfo("stmt bind thread stopped");
@ -767,8 +782,12 @@ void* stmtBindThreadFunc(void* param) {
int32_t stmtStartBindThread(STscStmt* pStmt) { int32_t stmtStartBindThread(STscStmt* pStmt) {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); if (taosThreadAttrInit(&thAttr) != 0) {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) { if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -777,7 +796,7 @@ int32_t stmtStartBindThread(STscStmt* pStmt) {
pStmt->bindThreadInUse = true; pStmt->bindThreadInUse = true;
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -800,7 +819,9 @@ int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pTblBuf->pBufList, &buff); if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
pTblBuf->pCurBuff = buff; pTblBuf->pCurBuff = buff;
pTblBuf->buffIdx = 1; pTblBuf->buffIdx = 1;
@ -834,7 +855,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->errCode = TSDB_CODE_SUCCESS; pStmt->errCode = TSDB_CODE_SUCCESS;
if (NULL != pOptions) { if (NULL != pOptions) {
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) { if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
pStmt->stbInterlaceMode = true; pStmt->stbInterlaceMode = true;
} }
@ -848,24 +869,26 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pStmt->sql.siInfo.pTableHash) { if (NULL == pStmt->sql.siInfo.pTableHash) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
if (NULL == pStmt->sql.siInfo.pTableCols) { if (NULL == pStmt->sql.siInfo.pTableCols) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf); code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
stmtInitQueue(pStmt); code = stmtInitQueue(pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = stmtStartBindThread(pStmt); code = stmtStartBindThread(pStmt);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
terrno = code; terrno = code;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
} }
@ -904,7 +927,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
char* dbName = NULL; char* dbName = NULL;
if (qParseDbName(sql, length, &dbName)) { if (qParseDbName(sql, length, &dbName)) {
stmtSetDbName(stmt, dbName); STMT_ERR_RET(stmtSetDbName(stmt, dbName));
taosMemoryFreeClear(dbName); taosMemoryFreeClear(dbName);
} }
@ -928,7 +951,9 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
@ -966,7 +991,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
int32_t insert = 0; int32_t insert = 0;
stmtIsInsert(stmt, &insert); STMT_ERR_RET(stmtIsInsert(stmt, &insert));
if (0 == insert) { if (0 == insert) {
tscError("set tb name not available for none insert statement"); tscError("set tb name not available for none insert statement");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -977,18 +1002,18 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
STMT_ERR_RET(stmtGetFromCache(pStmt)); STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
} }
} else { } else {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
pStmt->exec.pRequest->requestId++; pStmt->exec.pRequest->requestId++;
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1136,13 +1161,13 @@ int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
} }
if (0 == pStmt->sql.siInfo.firstName[0]) { if (0 == pStmt->sql.siInfo.firstName[0]) {
strcpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName); (void)strcpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName);
} }
param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
param->next = NULL; param->next = NULL;
atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
stmtEnqueue(pStmt, param); stmtEnqueue(pStmt, param);
@ -1162,7 +1187,9 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray**
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
} }
@ -1221,8 +1248,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery)); STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
if (pStmt->sql.pQuery->haveResultSet) { if (pStmt->sql.pQuery->haveResultSet) {
setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema, STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols); pStmt->sql.pQuery->numOfResCols));
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema); taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision); setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
} }
@ -1274,7 +1301,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
// param->tblData.aCol = taosArrayInit(20, POINTER_BYTES); // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
param->restoreTbCols = false; param->restoreTbCols = false;
strcpy(param->tblData.tbName, pStmt->bInfo.tbName); (void)strcpy(param->tblData.tbName, pStmt->bInfo.tbName);
} }
int64_t startUs3 = taosGetTimestampUs(); int64_t startUs3 = taosGetTimestampUs();
@ -1428,7 +1455,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
.requestId = pStmt->exec.pRequest->requestId, .requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self, .requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
pStmt->stat.ctgGetTbMetaNum++; pStmt->stat.ctgGetTbMetaNum++;
@ -1515,7 +1542,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else { } else {
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -1537,7 +1564,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
} }
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} }
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
@ -1562,7 +1589,7 @@ _return:
taosUsleep(1); taosUsleep(1);
} }
stmtCleanExecInfo(pStmt, (code ? false : true), false); STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
tFreeSSubmitRsp(pRsp); tFreeSSubmitRsp(pRsp);
@ -1582,7 +1609,7 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->queue.stopQueue = true; pStmt->queue.stopQueue = true;
if (pStmt->bindThreadInUse) { if (pStmt->bindThreadInUse) {
taosThreadJoin(pStmt->bindThread, NULL); (void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false; pStmt->bindThreadInUse = false;
} }
@ -1597,7 +1624,7 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
stmtCleanSQLInfo(pStmt); STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
taosMemoryFree(stmt); taosMemoryFree(stmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -2934,7 +2934,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
doFreeReqResultInfo(&pRspObj->resInfo); doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter);
if (pSW){ if (pSW){
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
} }
} }

View File

@ -41,7 +41,7 @@ void tmqGlobalMethod(JNIEnv *env) {
} }
if (g_vm == NULL) { if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm); (void)((*env)->GetJavaVM(env, &g_vm));
} }
jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback"); jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback");
@ -68,7 +68,7 @@ void tmqAssignmentMethod(JNIEnv *env) {
} }
if (g_vm == NULL) { if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm); (void)((*env)->GetJavaVM(env, &g_vm));
} }
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment"); jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
@ -104,7 +104,7 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) {
param = NULL; param = NULL;
if (needDetach) { if (needDetach) {
(*g_vm)->DetachCurrentThread(g_vm); (void)((*g_vm)->DetachCurrentThread(g_vm));
} }
env = NULL; env = NULL;
} }
@ -126,7 +126,7 @@ void consumer_callback(tmq_t *tmq, int32_t code, void *param) {
param = NULL; param = NULL;
if (needDetach) { if (needDetach) {
(*g_vm)->DetachCurrentThread(g_vm); (void)((*g_vm)->DetachCurrentThread(g_vm));
} }
env = NULL; env = NULL;
} }
@ -493,7 +493,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->SetIntField(env, metadataObj, g_metadataColindexField, i); (*env)->SetIntField(env, metadataObj, g_metadataColindexField, i);
jstring metadataObjColname = (*env)->NewStringUTF(env, fields[i].name); jstring metadataObjColname = (*env)->NewStringUTF(env, fields[i].name);
(*env)->SetObjectField(env, metadataObj, g_metadataColnameField, metadataObjColname); (*env)->SetObjectField(env, metadataObj, g_metadataColnameField, metadataObjColname);
(*env)->CallBooleanMethod(env, arrayListObj, g_arrayListAddFp, metadataObj); (void)(*env)->CallBooleanMethod(env, arrayListObj, g_arrayListAddFp, metadataObj);
} }
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
@ -567,7 +567,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset); (*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin); (*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end); (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment); (void)(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
} }
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
return JNI_SUCCESS; return JNI_SUCCESS;

View File

@ -86,7 +86,7 @@ static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
return NULL; return NULL;
} }
char* buf = pCont; char* buf = pCont;
strcat(buf++, "["); (void)strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) { if (readSize <= 0) {
if (readSize < 0){ if (readSize < 0){
@ -153,7 +153,7 @@ TEST(clientMonitorTest, ReadOneFile) {
const int size = 10; const int size = 10;
for(int i = 0; i < batch; i++){ for(int i = 0; i < batch; i++){
char value[size] = {0}; char value[size] = {0};
memset(value, '0' + i, size - 1); (void)memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr()); uError("failed to write len to file:%p since %s", pFile, terrstr());
} }

File diff suppressed because it is too large Load Diff

View File

@ -1148,13 +1148,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) { } else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL; void *pCont = NULL;
int32_t contLen = 0; int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) { if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen)) {
mndTransSetRpcRsp(pTrans, pCont, contLen); mndTransSetRpcRsp(pTrans, pCont, contLen);
} }
} else if (pTrans->originRpcType == TDMT_MND_CREATE_INDEX) { } else if (pTrans->originRpcType == TDMT_MND_CREATE_INDEX) {
void *pCont = NULL; void *pCont = NULL;
int32_t contLen = 0; int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) { if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen)) {
mndTransSetRpcRsp(pTrans, pCont, contLen); mndTransSetRpcRsp(pTrans, pCont, contLen);
} }
} }

View File

@ -31,7 +31,7 @@ Testbase MndTestTopic::test;
void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
SCreateDbReq createReq = {0}; SCreateDbReq createReq = {0};
strcpy(createReq.db, dbname); (void)strcpy(createReq.db, dbname);
createReq.numOfVgroups = 2; createReq.numOfVgroups = 2;
createReq.buffer = -1; createReq.buffer = -1;
createReq.pageSize = -1; createReq.pageSize = -1;
@ -53,7 +53,8 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
tSerializeSCreateDbReq(pReq, contLen, &createReq); assert(pReq != NULL);
(void)tSerializeSCreateDbReq(pReq, contLen, &createReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;
@ -61,14 +62,15 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) { void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
SCMCreateTopicReq createReq = {0}; SCMCreateTopicReq createReq = {0};
strcpy(createReq.name, topicName); (void)strcpy(createReq.name, topicName);
createReq.igExists = 0; createReq.igExists = 0;
createReq.sql = (char*)sql; createReq.sql = (char*)sql;
createReq.ast = NULL; createReq.ast = NULL;
int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
tSerializeSCMCreateTopicReq(pReq, contLen, &createReq); assert(pReq != NULL);
(void)tSerializeSCMCreateTopicReq(pReq, contLen, &createReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;
@ -76,11 +78,12 @@ void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql,
void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen) { void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen) {
SMDropTopicReq dropReq = {0}; SMDropTopicReq dropReq = {0};
strcpy(dropReq.name, topicName); (void)strcpy(dropReq.name, topicName);
int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq); int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
tSerializeSMDropTopicReq(pReq, contLen, &dropReq); assert(pReq != NULL);
(void)tSerializeSMDropTopicReq(pReq, contLen, &dropReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;

View File

@ -533,6 +533,10 @@ int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
for (int32_t i = 0; i < colNum; ++i) { for (int32_t i = 0; i < colNum; ++i) {
SColData* pCol = (SColData*)taosArrayGet(pCols, i); SColData* pCol = (SColData*)taosArrayGet(pCols, i);
if (pCol == NULL){
qError("qResetStmtColumns column is NULL");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (deepClear) { if (deepClear) {
tColDataDeepClear(pCol); tColDataDeepClear(pCol);
} else { } else {
@ -549,6 +553,10 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
for (int32_t i = 0; i < colNum; ++i) { for (int32_t i = 0; i < colNum; ++i) {
SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
if (pCol == NULL){
qError("qResetStmtDataBlock column is NULL");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (deepClear) { if (deepClear) {
tColDataDeepClear(pCol); tColDataDeepClear(pCol);
} else { } else {