fix:[TD-31017]process return value in client for tmq

This commit is contained in:
wangmm0220 2024-07-25 17:15:41 +08:00
parent 893766714a
commit ebe7276c2c
10 changed files with 331 additions and 200 deletions

View File

@ -152,6 +152,9 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))
#define TAOS_GET_TERRNO(code) \
(terrno == 0 ? code : terrno)
#define TAOS_RETURN(CODE) \ #define TAOS_RETURN(CODE) \
do { \ do { \
return (terrno = (CODE)); \ return (terrno = (CODE)); \

View File

@ -162,7 +162,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
data.clusterId = pTscObj->pAppInfo->clusterId; data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_WRITE; data.type = SLOW_LOG_WRITE;
data.data = value; data.data = value;
if(monitorPutData2MonitorQueue(data) < 0){ if(monitorPutData2MonitorQueue(data) != 0){
taosMemoryFree(value); taosMemoryFree(value);
} }
@ -479,7 +479,10 @@ void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0); if (tsem2_init(&pRequest->body.rspSem, 0, 0) != 0) {
doDestroyRequest(pRequest);
return NULL;
}
if (registerRequest(pRequest, pTscObj)) { if (registerRequest(pRequest, pTscObj)) {
doDestroyRequest(pRequest); doDestroyRequest(pRequest);
@ -601,7 +604,7 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
doFreeReqResultInfo(&pRequest->body.resInfo); doFreeReqResultInfo(&pRequest->body.resInfo);
tsem_destroy(&pRequest->body.rspSem); (void)tsem2_destroy(&pRequest->body.rspSem);
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);

View File

@ -324,7 +324,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
tsem_wait(&pRequest->body.rspSem); (void)tsem2_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1427,7 +1427,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); (void)tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg = const char* errorMsg =
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);

View File

@ -21,13 +21,10 @@ SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0}; char tmpSlowLogPath[PATH_MAX] = {0};
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
if (tsTempDir == NULL) {
return -1;
}
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
if (ret < 0) { if (ret < 0) {
tscError("failed to get tmp path ret:%d", ret); tscError("failed to get tmp path ret:%d", ret);
return ret; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
return 0; return 0;
} }
@ -71,10 +68,9 @@ static void destroyMonitorClient(void* data) {
if (pMonitor == NULL) { if (pMonitor == NULL) {
return; return;
} }
taosTmrStopA(&pMonitor->timer); (void)taosTmrStopA(&pMonitor->timer);
taosHashCleanup(pMonitor->counters); taosHashCleanup(pMonitor->counters);
taos_collector_registry_destroy(pMonitor->registry); (void)taos_collector_registry_destroy(pMonitor->registry);
// taos_collector_destroy(pMonitor->colector);
taosMemoryFree(pMonitor); taosMemoryFree(pMonitor);
} }
@ -142,15 +138,17 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
tscError("sendReport failed, out of memory, len:%d", tlen); tscError("sendReport failed, out of memory, len:%d", tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAILED; goto FAILED;
} }
tSerializeSStatisReq(buf, tlen, &sStatisReq); tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
if (tlen < 0) {
taosMemoryFree(buf);
goto FAILED;
}
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
tscError("sendReport failed, out of memory send info"); tscError("sendReport failed, out of memory send info");
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(buf); taosMemoryFree(buf);
goto FAILED; goto FAILED;
} }
@ -168,12 +166,12 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
FAILED: FAILED:
monitorFreeSlowLogDataEx(param); monitorFreeSlowLogDataEx(param);
return -1; return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
} }
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) { static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
char ts[50] = {0}; char ts[50] = {0};
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); (void)sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
if (NULL == pCont) { if (NULL == pCont) {
tscError("generateClusterReport failed, get null content."); tscError("generateClusterReport failed, get null content.");
@ -181,7 +179,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
} }
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
taos_collector_registry_clear_batch(registry); (void)taos_collector_registry_clear_batch(registry);
} }
taosMemoryFreeClear(pCont); taosMemoryFreeClear(pCont);
} }
@ -202,7 +200,7 @@ static void reportSendProcess(void* param, void* tmrId) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); (void)taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
taosRUnLockLatch(&monitorLock); taosRUnLockLatch(&monitorLock);
} }
@ -257,7 +255,6 @@ void monitorCreateClient(int64_t clusterId) {
tscError("failed to create monitor counters"); tscError("failed to create monitor counters");
goto fail; goto fail;
} }
// taosHashSetFreeFp(pMonitor->counters, destroyCounter);
if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) { if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
tscError("failed to put monitor client to hash"); tscError("failed to put monitor client to hash");
@ -300,10 +297,14 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return; if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor; MonitorClient* pMonitor = *ppMonitor;
taos_collector_add_metric(pMonitor->colector, newCounter); if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){
tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter);
goto end;
}
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor"); tscError("failed to put counter to monitor");
taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
goto end; goto end;
} }
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
@ -332,7 +333,10 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
goto end; goto end;
} }
taos_counter_inc(*ppCounter, label_values); if (taos_counter_inc(*ppCounter, label_values) != 0){
tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end: end:
@ -360,24 +364,23 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
tscInfo("[monitor] create slow log file:%s", path); tscInfo("[monitor] create slow log file:%s", path);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); tscError("failed to open file:%s since %d", path, errno);
tscError("failed to open file:%s since %s", path, terrstr());
return; return;
} }
SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
if (pClient == NULL) { if (pClient == NULL) {
tscError("failed to allocate memory for slow log client"); tscError("failed to allocate memory for slow log client");
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
return; return;
} }
pClient->lastCheckTime = taosGetMonoTimestampMs(); pClient->lastCheckTime = taosGetMonoTimestampMs();
strcpy(pClient->path, path); (void)strcpy(pClient->path, path);
pClient->offset = 0; pClient->offset = 0;
pClient->pFile = pFile; pClient->pFile = pFile;
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
taosMemoryFree(pClient); taosMemoryFree(pClient);
return; return;
} }
@ -423,7 +426,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
return NULL; return NULL;
} }
char* buf = pCont; char* buf = pCont;
strcat(buf++, "["); (void)strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) { if (readSize <= 0) {
if (readSize < 0) { if (readSize < 0) {
@ -454,7 +457,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
static int64_t getFileSize(char* path) { static int64_t getFileSize(char* path) {
int64_t fileSize = 0; int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return -1; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
return fileSize; return fileSize;
@ -464,13 +467,13 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
char* fileName, void* pTransporter, SEpSet* epSet) { char* fileName, void* pTransporter, SEpSet* epSet) {
if (data == NULL) { if (data == NULL) {
taosMemoryFree(fileName); taosMemoryFree(fileName);
return -1; return TSDB_CODE_INVALID_PARA;
} }
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if (pParam == NULL) { if (pParam == NULL) {
taosMemoryFree(data); taosMemoryFree(data);
taosMemoryFree(fileName); taosMemoryFree(fileName);
return -1; return terrno;
} }
pParam->data = data; pParam->data = data;
pParam->offset = offset; pParam->offset = offset;
@ -486,7 +489,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
SAppInstInfo* pInst = getAppInstByClusterId(clusterId); SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if (pInst == NULL) { if (pInst == NULL) {
tscError("failed to get app instance by clusterId:%" PRId64, clusterId); tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
return -1; return terrno;
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size); char* data = readFile(pFile, offset, size);
@ -495,13 +498,20 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
} }
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL){
return;
}
int64_t size = getFileSize(*fileName); int64_t size = getFileSize(*fileName);
if (size <= offset) { if (size <= offset) {
processFileInTheEnd(pFile, *fileName); processFileInTheEnd(pFile, *fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
} else { } else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); if (code == 0){
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
}else{
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code);
}
*fileName = NULL; *fileName = NULL;
} }
} }
@ -509,10 +519,12 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
static void monitorSendSlowLogAtRunning(int64_t clusterId) { static void monitorSendSlowLogAtRunning(int64_t clusterId) {
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL) { if (tmp == NULL) {
tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
return; return;
} }
SlowLogClient* pClient = (*(SlowLogClient**)tmp); SlowLogClient* pClient = (*(SlowLogClient**)tmp);
if (pClient == NULL) { if (pClient == NULL) {
tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
return; return;
} }
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
@ -574,14 +586,16 @@ static void monitorSendAllSlowLogAtQuit() {
} }
static void processFileRemoved(SlowLogClient* pClient) { static void processFileRemoved(SlowLogClient* pClient) {
taosUnLockFile(pClient->pFile); if (taosUnLockFile(pClient->pFile) != 0) {
taosCloseFile(&(pClient->pFile)); tscError("failed to unlock file:%s since %d", pClient->path, errno);
return;
}
(void)taosCloseFile(&(pClient->pFile));
TdFilePtr pFile = TdFilePtr pFile =
taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); tscError("failed to open file:%s since %d", pClient->path, errno);
tscError("failed to open file:%s since %s", pClient->path, terrstr());
} else { } else {
pClient->pFile = pFile; pClient->pFile = pFile;
} }
@ -594,7 +608,7 @@ static void monitorSendAllSlowLog() {
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
SlowLogClient* pClient = (*(SlowLogClient**)pIter); SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if (pClient == NULL) { if (pClient == NULL || pInst == NULL) {
taosHashCancelIterate(monitorSlowLogHash, pIter); taosHashCancelIterate(monitorSlowLogHash, pIter);
return; return;
} }
@ -604,7 +618,7 @@ static void monitorSendAllSlowLog() {
continue; continue;
} }
if (pInst != NULL && pClient->offset == 0) { if (pClient->offset == 0) {
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
if (size <= 0) { if (size <= 0) {
if (size < 0) { if (size < 0) {
@ -657,7 +671,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
} }
char filename[PATH_MAX] = {0}; char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) { if (pFile == NULL) {
tscError("failed to open file:%s since %s", filename, terrstr()); tscError("failed to open file:%s since %s", filename, terrstr());
@ -665,7 +679,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
} }
if (taosLockFile(pFile) < 0) { if (taosLockFile(pFile) < 0) {
tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
continue; continue;
} }
char* tmp = taosStrdup(filename); char* tmp = taosStrdup(filename);
@ -673,7 +687,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
taosMemoryFree(tmp); taosMemoryFree(tmp);
} }
taosCloseDir(&pDir); (void)taosCloseDir(&pDir);
} }
static void* monitorThreadFunc(void* param) { static void* monitorThreadFunc(void* param) {
@ -707,7 +721,7 @@ static void* monitorThreadFunc(void* param) {
} }
MonitorSlowLogData* slowLogData = NULL; MonitorSlowLogData* slowLogData = NULL;
taosReadQitem(monitorQueue, (void**)&slowLogData); (void)taosReadQitem(monitorQueue, (void**)&slowLogData);
if (slowLogData != NULL) { if (slowLogData != NULL) {
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) {
if (slowLogData->pFile != NULL) { if (slowLogData->pFile != NULL) {
@ -735,7 +749,7 @@ static void* monitorThreadFunc(void* param) {
if (quitCnt == 0) { if (quitCnt == 0) {
monitorSendAllSlowLog(); monitorSendAllSlowLog();
} }
tsem2_timewait(&monitorSem, 100); (void)tsem2_timewait(&monitorSem, 100);
} }
atomic_store_32(&slowLogFlag, -2); atomic_store_32(&slowLogFlag, -2);
return NULL; return NULL;
@ -743,12 +757,19 @@ static void* monitorThreadFunc(void* param) {
static int32_t tscMonitortInit() { static int32_t tscMonitortInit() {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); if (taosThreadAttrInit(&thAttr) != 0) {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); tscError("failed to init thread attr since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
tscError("failed to set thread attr since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
TdThread monitorThread; TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
tscError("failed to create monitor thread since %s", strerror(errno)); tscError("failed to create monitor thread since %s", strerror(errno));
return -1; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
@ -767,15 +788,14 @@ static void tscMonitorStop() {
} }
int32_t monitorInit() { int32_t monitorInit() {
int32_t code; int32_t code = 0;
tscInfo("[monitor] tscMonitor init"); tscInfo("[monitor] tscMonitor init");
monitorCounterHash = monitorCounterHash =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) { if (monitorCounterHash == NULL) {
tscError("failed to create monitorCounterHash"); tscError("failed to create monitorCounterHash");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
@ -783,46 +803,39 @@ int32_t monitorInit() {
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) { if (monitorSlowLogHash == NULL) {
tscError("failed to create monitorSlowLogHash"); tscError("failed to create monitorSlowLogHash");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) { if (monitorTimer == NULL) {
tscError("failed to create monitor timer"); tscError("failed to create monitor timer");
terrno = TSDB_CODE_OUT_OF_MEMORY; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
return -1;
} }
if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0) { code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
terrno = TSDB_CODE_TSC_INTERNAL_ERROR; if (code != 0) {
return -1; return code;
} }
if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) { if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) {
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr()); tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
return terrno; return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
} }
if (tsem2_init(&monitorSem, 0, 0) != 0) { if (tsem2_init(&monitorSem, 0, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("sem init error since %s", terrstr()); tscError("sem init error since %s", terrstr());
return -1; return TAOS_SYSTEM_ERROR(errno);
} }
code = taosOpenQueue(&monitorQueue); code = taosOpenQueue(&monitorQueue);
if (code) { if (code) {
terrno = code;
tscError("open queue error since %s", terrstr()); tscError("open queue error since %s", terrstr());
return -1; return TAOS_GET_TERRNO(code);
} }
taosInitRWLatch(&monitorLock); taosInitRWLatch(&monitorLock);
if (tscMonitortInit() != 0) { return tscMonitortInit();
return -1;
}
return 0;
} }
void monitorClose() { void monitorClose() {
@ -838,13 +851,13 @@ void monitorClose() {
taosHashCleanup(monitorSlowLogHash); taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer); taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue); taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem); (void)tsem2_destroy(&monitorSem);
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
int32_t code; int32_t code = 0;
MonitorSlowLogData* slowLogData; MonitorSlowLogData* slowLogData = NULL;
if (atomic_load_32(&slowLogFlag) == -2) { if (atomic_load_32(&slowLogFlag) == -2) {
tscError("[monitor] slow log thread is exiting"); tscError("[monitor] slow log thread is exiting");
@ -854,13 +867,13 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData); code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
if (code) { if (code) {
tscError("[monitor] failed to allocate slow log data"); tscError("[monitor] failed to allocate slow log data");
return terrno = code; return code;
} }
*slowLogData = data; *slowLogData = data;
tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
queueTypeStr[slowLogData->type], slowLogData->data); queueTypeStr[slowLogData->type], slowLogData->data);
if (taosWriteQitem(monitorQueue, slowLogData) == 0) { if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
tsem2_post(&monitorSem); (void)tsem2_post(&monitorSem);
} else { } else {
monitorFreeSlowLogData(slowLogData); monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData); taosFreeQitem(slowLogData);

View File

@ -69,7 +69,7 @@ void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) {
} else { } else {
clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost); clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost);
} }
releaseTscObj(rid); (void)releaseTscObj(rid);
} else { } else {
tscLog("slowQueryLog, not found rid"); tscLog("slowQueryLog, not found rid");
} }

View File

@ -65,7 +65,7 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) {
} else { } else {
clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type); clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type);
} }
releaseTscObj(rid); (void)releaseTscObj(rid);
} else { } else {
tscLog("sqlReqLog, not found rid"); tscLog("sqlReqLog, not found rid");
} }

View File

@ -40,7 +40,10 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)); code = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
if (code != 0){
setErrno(pRequest, code);
}
} }
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
@ -48,7 +51,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -61,7 +64,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -70,7 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (NULL == pTscObj->pAppInfo) { if (NULL == pTscObj->pAppInfo) {
code = TSDB_CODE_TSC_DISCONNECTED; code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -78,14 +81,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) { if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
code = TSDB_CODE_TSC_INVALID_VERSION; code = TSDB_CODE_TSC_INVALID_VERSION;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) { if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer); tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -95,14 +98,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = TSDB_CODE_TIME_UNSYNCED; code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta); tscError("time diff:%ds is too big", delta);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
@ -111,8 +114,13 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet dstEpSet = connectRsp.epSet; SEpSet dstEpSet = connectRsp.epSet;
if (srcEpSet.numOfEps == 1) { if (srcEpSet.numOfEps == 1) {
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, code = rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn); dstEpSet.eps[dstEpSet.inUse].fqdn);
if (code != 0){
setErrno(pRequest, code);
(void)tsem2_post(&pRequest->body.rspSem);
goto End;
}
updateEpSet = 0; updateEpSet = 0;
} }
} }
@ -158,7 +166,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
MonitorSlowLogData data = {0}; MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId; data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG; data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data); (void)monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId);
} }
@ -167,12 +175,17 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
if (pAppHbMgr) { if (pAppHbMgr) {
hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); code = hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
if (code != 0){
setErrno(pRequest, code);
(void)tsem2_post(&pRequest->body.rspSem);
goto End;
}
} else { } else {
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
code = TSDB_CODE_TSC_DISCONNECTED; code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
goto End; goto End;
} }
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
@ -180,7 +193,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
End: End:
if (pRequest) { if (pRequest) {
@ -216,7 +229,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
@ -225,28 +238,39 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); if (code != TSDB_CODE_SUCCESS) {
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); setErrno(pRequest, code);
}
if (code == TSDB_CODE_SUCCESS) {
(void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
}
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
} }
} }
if (pRequest->body.queryFp) { if (pRequest->body.queryFp) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
SUseDbRsp usedbRsp = {0};
code = tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
if (code != 0){
goto END;
}
if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code) { TSDB_CODE_MND_DB_IN_DROPPING == code) {
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { // cached in local if (usedbRsp.vgVersion >= 0) { // cached in local
@ -256,42 +280,35 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
tstrerror(code1)); tstrerror(code1));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); code = catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
if (code != 0){
goto END;
}
} }
} }
tFreeSUsedbRsp(&usedbRsp);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pData); goto END;
taosMemoryFree(pMsg->pEpSet);
setErrno(pRequest, code);
if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, pRequest->code);
} else {
tsem_post(&pRequest->body.rspSem);
} }
return code;
}
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
if (strlen(usedbRsp.db) == 0) { if (strlen(usedbRsp.db) == 0) {
if (usedbRsp.errCode != 0) { if (usedbRsp.errCode != 0) {
return usedbRsp.errCode; code = usedbRsp.errCode;
} else { } else {
return TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
} }
goto END;
} }
tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum); tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
for (int32_t i = 0; i < usedbRsp.vgNum; ++i) { for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i); SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
if (pInfo == NULL){
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
goto END;
}
tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse); tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) { for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port); tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
@ -299,11 +316,13 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
SName name = {0}; SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); code = tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
if (code != 0){
goto END;
}
SUseDbOutput output = {0}; SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp); code = queryBuildUseDbOutput(&output, &usedbRsp);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
@ -317,28 +336,32 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1)); tstrerror(code1));
} else { } else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); code = catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
if (code == 0){
output.dbVgroup = NULL; output.dbVgroup = NULL;
} }
} }
}
taosMemoryFreeClear(output.dbVgroup); taosMemoryFreeClear(output.dbVgroup);
tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db); (void)tNameGetDbName(&name, db);
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
END:
setErrno(pRequest, code);
tFreeSUsedbRsp(&usedbRsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, pRequest->code); doRequestCallback(pRequest, pRequest->code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return 0; return code;
} }
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
@ -353,7 +376,10 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMCreateStbRsp createRsp = {0}; SMCreateStbRsp createRsp = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp); code = tDecodeSMCreateStbRsp(&coder, &createRsp);
if (code != 0){
setErrno(pRequest, TAOS_GET_TERRNO(code));
}
tDecoderClear(&coder); tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB; pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
@ -380,7 +406,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -391,33 +417,49 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
SDropDbRsp dropdbRsp = {0}; SDropDbRsp dropdbRsp = {0};
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); code = tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
if (code != 0){
setErrno(pRequest, TAOS_GET_TERRNO(code));
goto END;
}
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); code = catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
if (code != 0){
setErrno(pRequest, TAOS_GET_TERRNO(code));
goto END;
}
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); if (code != 0){
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); setErrno(pRequest, TAOS_GET_TERRNO(code));
goto END;
}
(void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
if (code != 0){
setErrno(pRequest, TAOS_GET_TERRNO(code));
goto END;
}
} }
} }
END:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
@ -430,7 +472,10 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMAlterStbRsp alterRsp = {0}; SMAlterStbRsp alterRsp = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMAlterStbRsp(&coder, &alterRsp); code = tDecodeSMAlterStbRsp(&coder, &alterRsp);
if (code != 0){
setErrno(pRequest, TAOS_GET_TERRNO(code));
}
tDecoderClear(&coder); tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
@ -457,57 +502,72 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) { static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
int32_t code = 0;
int32_t line = 0;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
int32_t numOfCfg = taosArrayGetSize(pVars); int32_t numOfCfg = taosArrayGetSize(pVars);
blockDataEnsureCapacity(pBlock, numOfCfg); code = blockDataEnsureCapacity(pBlock, numOfCfg);
TSDB_CHECK_CODE(code, line, END);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SVariablesInfo* pInfo = taosArrayGet(pVars, i); SVariablesInfo* pInfo = taosArrayGet(pVars, i);
TSDB_CHECK_NULL(pInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, name, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, name, false);
TSDB_CHECK_CODE(code, line, END);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, value, false);
TSDB_CHECK_CODE(code, line, END);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false); TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
code = colDataSetVal(pColInfo, i, scope, false);
TSDB_CHECK_CODE(code, line, END);
} }
pBlock->info.rows = numOfCfg; pBlock->info.rows = numOfCfg;
*block = pBlock; *block = pBlock;
return code;
return TSDB_CODE_SUCCESS; END:
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock);
return code;
} }
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
@ -577,55 +637,72 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) { static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
int32_t code = 0;
int32_t line = 0;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN; infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_INT; infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN; infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY);
blockDataEnsureCapacity(pBlock, 1); code = blockDataEnsureCapacity(pBlock, 1);
TSDB_CHECK_CODE(code, line, END);
SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
TSDB_CHECK_NULL(pResultCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
TSDB_CHECK_NULL(pIdCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2); SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
TSDB_CHECK_NULL(pReasonCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0}; char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0}; char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
if (pRsp->bAccepted) { if (pRsp->bAccepted) {
STR_TO_VARSTR(result, "accepted"); STR_TO_VARSTR(result, "accepted");
colDataSetVal(pResultCol, 0, result, false); code = colDataSetVal(pResultCol, 0, result, false);
colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false); TSDB_CHECK_CODE(code, line, END);
code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
TSDB_CHECK_CODE(code, line, END);
STR_TO_VARSTR(reason, "success"); STR_TO_VARSTR(reason, "success");
colDataSetVal(pReasonCol, 0, reason, false); code = colDataSetVal(pReasonCol, 0, reason, false);
TSDB_CHECK_CODE(code, line, END);
} else { } else {
STR_TO_VARSTR(result, "rejected"); STR_TO_VARSTR(result, "rejected");
colDataSetVal(pResultCol, 0, result, false); code = colDataSetVal(pResultCol, 0, result, false);
TSDB_CHECK_CODE(code, line, END);
colDataSetNULL(pIdCol, 0); colDataSetNULL(pIdCol, 0);
STR_TO_VARSTR(reason, "compaction is ongoing"); STR_TO_VARSTR(reason, "compaction is ongoing");
colDataSetVal(pReasonCol, 0, reason, false); code = colDataSetVal(pReasonCol, 0, reason, false);
TSDB_CHECK_CODE(code, line, END);
} }
pBlock->info.rows = 1; pBlock->info.rows = 1;
*block = pBlock; *block = pBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
END:
taosMemoryFree(pBlock);
taosArrayDestroy(pBlock->pDataBlock);
return code;
} }
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) { static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
@ -696,7 +773,7 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code); pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); (void)tsem2_post(&pRequest->body.rspSem);
} }
return code; return code;
} }

View File

@ -14,15 +14,20 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
pTblBuf->buffOffset += pTblBuf->buffUnit; pTblBuf->buffOffset += pTblBuf->buffUnit;
} else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) { } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++); pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
if (NULL == pTblBuf->pCurBuff) {
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
}
*pBuf = pTblBuf->pCurBuff; *pBuf = pTblBuf->pCurBuff;
pTblBuf->buffOffset = pTblBuf->buffUnit; pTblBuf->buffOffset = pTblBuf->buffUnit;
} else { } else {
void* buff = taosMemoryMalloc(pTblBuf->buffSize); void* buff = taosMemoryMalloc(pTblBuf->buffSize);
if (NULL == buff) { if (NULL == buff) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
taosArrayPush(pTblBuf->pBufList, &buff); if(taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
return terrno;
}
pTblBuf->buffIdx++; pTblBuf->buffIdx++;
pTblBuf->pCurBuff = buff; pTblBuf->pCurBuff = buff;
@ -48,7 +53,7 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
*param = node; *param = node;
atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); (void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1);
return true; return true;
} }
@ -58,7 +63,7 @@ void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
pStmt->queue.tail = param; pStmt->queue.tail = param;
pStmt->stat.bindDataNum++; pStmt->stat.bindDataNum++;
atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
} }
static int32_t stmtCreateRequest(STscStmt* pStmt) { static int32_t stmtCreateRequest(STscStmt* pStmt) {
@ -183,8 +188,8 @@ int32_t stmtBackupQueryFields(STscStmt* pStmt) {
if (NULL == pRes->fields || NULL == pRes->userFields) { if (NULL == pRes->fields || NULL == pRes->userFields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -201,7 +206,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
} }
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
@ -209,7 +214,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -219,10 +224,13 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
bool autoCreateTbl) { bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(tbName, tbFName); int32_t code = tNameExtractFullName(tbName, tbFName);
if (code != 0){
return code;
}
memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); (void)strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0; pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid; pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
@ -383,17 +391,21 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
taosMemoryFreeClear(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags);
} }
pStmt->bInfo.stbFName[0] = 0; pStmt->bInfo.stbFName[0] = 0;
;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void stmtFreeTableBlkList(STableColsData* pTb) { void stmtFreeTableBlkList(STableColsData* pTb) {
qResetStmtColumns(pTb->aCol, true); (void)qResetStmtColumns(pTb->aCol, true);
taosArrayDestroy(pTb->aCol); taosArrayDestroy(pTb->aCol);
} }
void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
if (NULL == pTblBuf->pCurBuff) {
tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
return;
}
pTblBuf->buffIdx = 1; pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head); pTblBuf->buffOffset = sizeof(*pQueue->head);
@ -505,7 +517,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx); qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols); taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
memset(&pStmt->sql, 0, sizeof(pStmt->sql)); (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
pStmt->sql.siInfo.tableColsReady = true; pStmt->sql.siInfo.tableColsReady = true;
STMT_DLOG_E("end to free SQL info"); STMT_DLOG_E("end to free SQL info");
@ -726,6 +738,9 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) { for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i); SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
*p = taosArrayInit(20, POINTER_BYTES); *p = taosArrayInit(20, POINTER_BYTES);
if (*p == NULL) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
} }
atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true); atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
@ -735,7 +750,7 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
// taosMemoryFree(pParam->pTbData); // taosMemoryFree(pParam->pTbData);
atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -757,7 +772,7 @@ void* stmtBindThreadFunc(void* param) {
continue; continue;
} }
stmtAsyncOutput(pStmt, asyncParam); (void)stmtAsyncOutput(pStmt, asyncParam);
} }
qInfo("stmt bind thread stopped"); qInfo("stmt bind thread stopped");
@ -767,8 +782,12 @@ void* stmtBindThreadFunc(void* param) {
int32_t stmtStartBindThread(STscStmt* pStmt) { int32_t stmtStartBindThread(STscStmt* pStmt) {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); if (taosThreadAttrInit(&thAttr) != 0) {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) { if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -777,7 +796,7 @@ int32_t stmtStartBindThread(STscStmt* pStmt) {
pStmt->bindThreadInUse = true; pStmt->bindThreadInUse = true;
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -800,7 +819,9 @@ int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pTblBuf->pBufList, &buff); if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
pTblBuf->pCurBuff = buff; pTblBuf->pCurBuff = buff;
pTblBuf->buffIdx = 1; pTblBuf->buffIdx = 1;
@ -834,7 +855,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->errCode = TSDB_CODE_SUCCESS; pStmt->errCode = TSDB_CODE_SUCCESS;
if (NULL != pOptions) { if (NULL != pOptions) {
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) { if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
pStmt->stbInterlaceMode = true; pStmt->stbInterlaceMode = true;
} }
@ -848,24 +869,26 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pStmt->sql.siInfo.pTableHash) { if (NULL == pStmt->sql.siInfo.pTableHash) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
if (NULL == pStmt->sql.siInfo.pTableCols) { if (NULL == pStmt->sql.siInfo.pTableCols) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf); code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
stmtInitQueue(pStmt); code = stmtInitQueue(pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = stmtStartBindThread(pStmt); code = stmtStartBindThread(pStmt);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
terrno = code; terrno = code;
stmtClose(pStmt); (void)stmtClose(pStmt);
return NULL; return NULL;
} }
} }
@ -904,7 +927,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
char* dbName = NULL; char* dbName = NULL;
if (qParseDbName(sql, length, &dbName)) { if (qParseDbName(sql, length, &dbName)) {
stmtSetDbName(stmt, dbName); STMT_ERR_RET(stmtSetDbName(stmt, dbName));
taosMemoryFreeClear(dbName); taosMemoryFreeClear(dbName);
} }
@ -928,7 +951,9 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
@ -977,18 +1002,18 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
STMT_ERR_RET(stmtGetFromCache(pStmt)); STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
} }
} else { } else {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
pStmt->exec.pRequest->requestId++; pStmt->exec.pRequest->requestId++;
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1142,7 +1167,7 @@ int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
param->next = NULL; param->next = NULL;
atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
stmtEnqueue(pStmt, param); stmtEnqueue(pStmt, param);
@ -1162,7 +1187,9 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray**
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
} }
@ -1428,7 +1455,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
.requestId = pStmt->exec.pRequest->requestId, .requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self, .requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
pStmt->stat.ctgGetTbMetaNum++; pStmt->stat.ctgGetTbMetaNum++;
@ -1515,7 +1542,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else { } else {
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -1537,7 +1564,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
} }
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} }
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
@ -1562,7 +1589,7 @@ _return:
taosUsleep(1); taosUsleep(1);
} }
stmtCleanExecInfo(pStmt, (code ? false : true), false); STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
tFreeSSubmitRsp(pRsp); tFreeSSubmitRsp(pRsp);
@ -1582,7 +1609,7 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->queue.stopQueue = true; pStmt->queue.stopQueue = true;
if (pStmt->bindThreadInUse) { if (pStmt->bindThreadInUse) {
taosThreadJoin(pStmt->bindThread, NULL); (void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false; pStmt->bindThreadInUse = false;
} }
@ -1597,7 +1624,7 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
stmtCleanSQLInfo(pStmt); STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
taosMemoryFree(stmt); taosMemoryFree(stmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -41,7 +41,7 @@ void tmqGlobalMethod(JNIEnv *env) {
} }
if (g_vm == NULL) { if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm); (void)((*env)->GetJavaVM(env, &g_vm));
} }
jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback"); jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback");
@ -68,7 +68,7 @@ void tmqAssignmentMethod(JNIEnv *env) {
} }
if (g_vm == NULL) { if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm); (void)((*env)->GetJavaVM(env, &g_vm));
} }
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment"); jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
@ -104,7 +104,7 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) {
param = NULL; param = NULL;
if (needDetach) { if (needDetach) {
(*g_vm)->DetachCurrentThread(g_vm); (void)((*g_vm)->DetachCurrentThread(g_vm));
} }
env = NULL; env = NULL;
} }
@ -126,7 +126,7 @@ void consumer_callback(tmq_t *tmq, int32_t code, void *param) {
param = NULL; param = NULL;
if (needDetach) { if (needDetach) {
(*g_vm)->DetachCurrentThread(g_vm); (void)((*g_vm)->DetachCurrentThread(g_vm));
} }
env = NULL; env = NULL;
} }

View File

@ -518,6 +518,10 @@ int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
for (int32_t i = 0; i < colNum; ++i) { for (int32_t i = 0; i < colNum; ++i) {
SColData* pCol = (SColData*)taosArrayGet(pCols, i); SColData* pCol = (SColData*)taosArrayGet(pCols, i);
if (pCol == NULL){
qError("qResetStmtColumns column is NULL");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (deepClear) { if (deepClear) {
tColDataDeepClear(pCol); tColDataDeepClear(pCol);
} else { } else {
@ -534,6 +538,10 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
for (int32_t i = 0; i < colNum; ++i) { for (int32_t i = 0; i < colNum; ++i) {
SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
if (pCol == NULL){
qError("qResetStmtDataBlock column is NULL");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (deepClear) { if (deepClear) {
tColDataDeepClear(pCol); tColDataDeepClear(pCol);
} else { } else {