Merge pull request #27795 from taosdata/fix/TD-31899

fix:[TD-31899] remove void(func)
This commit is contained in:
Pan Wei 2024-09-13 18:24:45 +08:00 committed by GitHub
commit bac26332eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 409 additions and 247 deletions

View File

@ -1636,8 +1636,9 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (pReq) {
tFreeClientHbReq(pReq);
if (TSDB_CODE_SUCCESS != taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey))) {
tscError("failed to remove connKey from activeInfo");
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (TSDB_CODE_SUCCESS != code) {
tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
taosHashRelease(pAppHbMgr->activeInfo, pReq);
(void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);

View File

@ -1315,9 +1315,10 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
if (TSDB_CODE_SUCCESS != refreshMeta(pRequest->pTscObj, pRequest)) {
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
code = refreshMeta(pRequest->pTscObj, pRequest);
if (code != 0){
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
}
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
@ -1524,7 +1525,7 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SName tableName;
SName tableName = {0};
toName(pTscObj->acctId, db, table, &tableName);
SVgroupInfo vgInfo;

View File

@ -68,9 +68,14 @@ static void destroyMonitorClient(void* data) {
if (pMonitor == NULL) {
return;
}
(void)taosTmrStopA(&pMonitor->timer);
if (!taosTmrStopA(&pMonitor->timer)) {
tscError("failed to stop timer, pMonitor:%p", pMonitor);
}
taosHashCleanup(pMonitor->counters);
(void)taos_collector_registry_destroy(pMonitor->registry);
int ret = taos_collector_registry_destroy(pMonitor->registry);
if (ret){
tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
}
taosMemoryFree(pMonitor);
}
@ -186,7 +191,10 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
}
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
(void)taos_collector_registry_clear_batch(registry);
int ret = taos_collector_registry_clear_batch(registry);
if (ret){
tscError("failed to clear registry, ret:%d", ret);
}
}
taosMemoryFreeClear(pCont);
}
@ -207,7 +215,10 @@ static void reportSendProcess(void* param, void* tmrId) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
(void)taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
bool reset = taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
if (!reset){
tscError("failed to reset timer, pMonitor:%p", pMonitor);
}
taosRUnLockLatch(&monitorLock);
}
@ -255,7 +266,11 @@ void monitorCreateClient(int64_t clusterId) {
goto fail;
}
(void)taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
if (r){
tscError("failed to register collector, ret:%d", r);
goto fail;
}
pMonitor->counters =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pMonitor->counters == NULL) {
@ -304,12 +319,18 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
MonitorClient* pMonitor = *ppMonitor;
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter);
int r = taos_counter_destroy(newCounter);
if (r){
tscError("failed to destroy counter, code: %d", r);
}
goto end;
}
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor");
(void)taos_counter_destroy(newCounter);
int r = taos_counter_destroy(newCounter);
if (r){
tscError("failed to destroy counter, code: %d", r);
}
goto end;
}
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
@ -374,7 +395,10 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
if (pClient == NULL) {
tscError("failed to allocate memory for slow log client");
(void)taosCloseFile(&pFile);
int32_t ret = taosCloseFile(&pFile);
if (ret != 0){
tscError("failed to close file:%p ret:%d", pFile, ret);
}
return;
}
pClient->lastCheckTime = taosGetMonoTimestampMs();
@ -383,7 +407,10 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
pClient->pFile = pFile;
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
(void)taosCloseFile(&pFile);
int32_t ret = taosCloseFile(&pFile);
if (ret != 0){
tscError("failed to close file:%p ret:%d", pFile, ret);
}
taosMemoryFree(pClient);
return;
}
@ -609,7 +636,11 @@ static void processFileRemoved(SlowLogClient* pClient) {
tscError("failed to unlock file:%s since %d", pClient->path, terrno);
return;
}
(void)taosCloseFile(&(pClient->pFile));
int32_t ret = taosCloseFile(&(pClient->pFile));
if (ret != 0){
tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
return;
}
TdFilePtr pFile =
taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
@ -698,7 +729,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
}
if (taosLockFile(pFile) < 0) {
tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
(void)taosCloseFile(&pFile);
int32_t ret = taosCloseFile(&pFile);
if (ret != 0){
tscError("failed to close file:%p ret:%d", pFile, ret);
}
continue;
}
char* tmp = taosStrdup(filename);
@ -706,7 +740,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
taosMemoryFree(tmp);
}
(void)taosCloseDir(&pDir);
int32_t ret = taosCloseDir(&pDir);
if (ret != 0){
tscError("failed to close dir, ret:%d", ret);
}
}
static void* monitorThreadFunc(void* param) {
@ -852,7 +889,9 @@ void monitorClose() {
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue);
(void)tsem2_destroy(&monitorSem);
if(tsem2_destroy(&monitorSem) != 0) {
tscError("failed to destroy semaphore");
}
taosWUnLockLatch(&monitorLock);
}
@ -874,7 +913,9 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
queueTypeStr[slowLogData->type], slowLogData->data);
if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
(void)tsem2_post(&monitorSem);
if(tsem2_post(&monitorSem) != 0) {
tscError("failed to post semaphore");
}
} else {
if (taosCloseFile(&(slowLogData->pFile)) != 0) {
tscError("failed to close file:%p", slowLogData->pFile);

View File

@ -50,7 +50,9 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -58,12 +60,10 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
if (NULL == pRequest) {
goto End;
goto EXIT;
}
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
@ -71,23 +71,17 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (NULL == pTscObj->pAppInfo) {
code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
SConnectRsp connectRsp = {0};
if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
code = TSDB_CODE_TSC_INVALID_VERSION;
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer);
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
@ -96,15 +90,11 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (delta > timestampDeltaLimit) {
code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta);
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
if (connectRsp.epSet.numOfEps == 0) {
code = TSDB_CODE_APP_ERROR;
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
@ -177,8 +167,6 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
} else {
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
code = TSDB_CODE_TSC_DISCONNECTED;
setErrno(pRequest, code);
(void)tsem_post(&pRequest->body.rspSem);
goto End;
}
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
@ -186,13 +174,19 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns);
(void)tsem_post(&pRequest->body.rspSem);
End:
if (code != 0){
setErrno(pRequest, code);
}
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
if (pRequest) {
(void)releaseRequest(pRequest->self);
}
EXIT:
taosMemoryFree(param);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
@ -245,7 +239,9 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -285,7 +281,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, pRequest->code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
@ -363,7 +361,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, pRequest->code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return 0;
}
@ -386,7 +386,12 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMCreateStbRsp createRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
(void)tDecodeSMCreateStbRsp(&coder, &createRsp); // pMsg->len == 0
if (pMsg->len > 0){
code = tDecodeSMCreateStbRsp(&coder, &createRsp); // pMsg->len == 0
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
}
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
@ -413,7 +418,9 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -451,14 +458,15 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
}
END:
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -471,7 +479,12 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SMAlterStbRsp alterRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
(void)tDecodeSMAlterStbRsp(&coder, &alterRsp); // pMsg->len = 0
if (pMsg->len > 0){
code = tDecodeSMAlterStbRsp(&coder, &alterRsp); // pMsg->len == 0
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
}
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
@ -498,7 +511,9 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -649,7 +664,9 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) {
doRequestCallback(pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}
@ -801,7 +818,9 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
} else {
(void)tsem_post(&pRequest->body.rspSem);
if (tsem_post(&pRequest->body.rspSem) != 0){
tscError("failed to post semaphore");
}
}
return code;
}

View File

@ -960,6 +960,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
RAW_NULL_CHECK(pCmdMsg.pMsg);
if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
code = TSDB_CODE_INVALID_PARA;
taosMemoryFree(pCmdMsg.pMsg);
goto end;
}
@ -971,15 +972,15 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg);
if (pRequest->code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
// ignore the return value
(void)catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
(void)catalogRemoveTableMeta(pCatalog, &tableName);
RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
}
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
@ -1022,8 +1023,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
SName pName = {0};
toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name,
&pName); // ignore the return value, always return pName
toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
STableMeta* pTableMeta = NULL;
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
@ -1064,6 +1064,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
RAW_NULL_CHECK(pCmdMsg.pMsg);
if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
code = TSDB_CODE_INVALID_PARA;
taosMemoryFree(pCmdMsg.pMsg);
goto end;
}
@ -1074,15 +1075,14 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg);
if (pRequest->code == TSDB_CODE_SUCCESS) {
// ignore the error code
(void)catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
(void)catalogRemoveTableMeta(pCatalog, &tableName);
RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
}
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
@ -1230,7 +1230,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) {
(void)removeMeta(pTscObj, pRequest->tableList, false);
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
}
code = pRequest->code;
@ -1359,7 +1359,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) {
(void)removeMeta(pTscObj, pRequest->tableList, false);
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
}
code = pRequest->code;

View File

@ -116,7 +116,7 @@ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo *conn, const char
return TSDB_CODE_SML_INVALID_DATA;
}
} else {
toName(info->taos->acctId, info->pRequest->pDb, pTabName, &pAuth.tbName); //ignore
toName(info->taos->acctId, info->pRequest->pDb, pTabName, &pAuth.tbName);
}
pAuth.type = type;
@ -2217,9 +2217,12 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
break;
}
taosMsleep(100);
(void)refreshMeta(request->pTscObj, request); //ignore return code,try again
uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code,
tstrerror(code));
code = refreshMeta(request->pTscObj, request);
if (code != 0){
uInfo("SML:%" PRIx64 " refresh meta error code:%d, msg:%s", info->id, code, tstrerror(code));
}
smlDestroyInfo(info);
info = NULL;
taos_free_result(request);

View File

@ -771,7 +771,10 @@ void* stmtBindThreadFunc(void* param) {
continue;
}
(void)stmtAsyncOutput(pStmt, asyncParam);
int ret = stmtAsyncOutput(pStmt, asyncParam);
if (ret != 0){
qError("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
}
}
qInfo("stmt bind thread stopped");

View File

@ -807,11 +807,18 @@ static void generateTimedTask(int64_t refId, int32_t type) {
if (code == TSDB_CODE_SUCCESS) {
*pTaskType = type;
if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
(void)tsem2_post(&tmq->rspSem);
if (tsem2_post(&tmq->rspSem) != 0){
tscError("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
}
}else{
taosFreeQitem(pTaskType);
}
}
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
code = taosReleaseRef(tmqMgmt.rsetId, refId);
if (code != 0){
tscError("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
}
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
@ -824,8 +831,13 @@ void tmqReplayTask(void* param, void* tmrId) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) return;
(void)tsem2_post(&tmq->rspSem);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
if (tsem2_post(&tmq->rspSem) != 0){
tscError("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
}
int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
if (code != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", refId, code);
}
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
@ -835,17 +847,17 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) {
goto _return;
goto END;
}
if (pMsg == NULL || param == NULL) {
code = TSDB_CODE_INVALID_PARA;
goto _return;
goto END;
}
SMqHbRsp rsp = {0};
code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
if (code != 0) {
goto _return;
goto END;
}
int64_t refId = (int64_t)param;
@ -866,13 +878,15 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
}
}
taosWUnLockLatch(&tmq->lock);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
code = taosReleaseRef(tmqMgmt.rsetId, refId);
if (code != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", refId, code);
}
}
tDestroySMqHbRsp(&rsp);
_return:
END:
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return code;
@ -974,9 +988,15 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
tDestroySMqHbReq(&req);
if (tmrId != NULL) {
(void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
if (!ret){
tscError("failed to reset timer fo tmq hb");
}
}
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", refId, ret);
}
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
@ -995,9 +1015,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
return;
}
(void)taosReadAllQitems(pTmq->delayedTask, qall);
int32_t numOfItems = taosQallItemSize(qall);
int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
if (numOfItems == 0) {
taosFreeQall(qall);
return;
@ -1005,9 +1023,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL;
(void)taosGetQitem(qall, (void**)&pTaskType);
while (pTaskType != NULL) {
while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
code = askEp(pTmq, NULL, false, false);
if (code != 0) {
@ -1015,21 +1031,26 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
continue;
}
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
(void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer);
if (!ret){
tscError("failed to reset timer fo tmq ask ep");
}
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
(void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->commitTimer);
if (!ret){
tscError("failed to reset timer fo commit");
}
} else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
}
taosFreeQitem(pTaskType);
(void)taosGetQitem(qall, (void**)&pTaskType);
}
taosFreeQall(qall);
@ -1061,26 +1082,18 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* rspWrapper = NULL;
while (1) {
(void)taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper) {
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper);
} else {
break;
}
}
rspWrapper = NULL;
(void)taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
(void)taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper) {
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
return;
}
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper);
} else {
break;
}
}
}
@ -1095,7 +1108,9 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pEpSet);
}
(void)tsem2_post(&pParam->rspSem);
if (tsem2_post(&pParam->rspSem) != 0){
tscError("failed to post sem, subscribe cb");
}
return 0;
}
@ -1154,19 +1169,27 @@ void tmqFreeImpl(void* handle) {
}
taosFreeQall(tmq->qall);
(void)tsem2_destroy(&tmq->rspSem);
if(tsem2_destroy(&tmq->rspSem) != 0) {
tscError("failed to destroy sem in free tmq");
}
taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
taos_close_internal(tmq->pTscObj);
if (tmq->commitTimer) {
(void)taosTmrStopA(&tmq->commitTimer);
if (!taosTmrStopA(&tmq->commitTimer)) {
tscError("failed to stop commit timer");
}
}
if (tmq->epTimer) {
(void)taosTmrStopA(&tmq->epTimer);
if (!taosTmrStopA(&tmq->epTimer)) {
tscError("failed to stop ep timer");
}
}
if (tmq->hbLiveTimer) {
(void)taosTmrStopA(&tmq->hbLiveTimer);
if (!taosTmrStopA(&tmq->hbLiveTimer)) {
tscError("failed to stop hb timer");
}
}
taosMemoryFree(tmq);
@ -1312,7 +1335,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (code) {
terrno = code;
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
(void)tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
@ -1419,7 +1441,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
void* abuf = buf;
(void)tSerializeSCMSubscribeReq(&abuf, &req);
tlen = tSerializeSCMSubscribeReq(&abuf, &req);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
@ -1450,8 +1472,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto FAIL;
}
(void)tsem2_wait(&param.rspSem);
(void)tsem2_destroy(&param.rspSem);
if (tsem2_wait(&param.rspSem) != 0){
tscError("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
}
if(tsem2_destroy(&param.rspSem) != 0) {
tscError("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
}
if (param.rspErr != 0) {
code = param.rspErr;
@ -1656,6 +1682,8 @@ END:
(void)strcpy(pRspWrapper->topicName, pParam->topicName);
code = taosWriteQitem(tmq->mqueue, pRspWrapper);
if (code != 0) {
tmqFreeRspWrapper((SMqRspWrapper*)pRspWrapper);
taosFreeQitem(pRspWrapper);
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
}
}
@ -1663,10 +1691,17 @@ END:
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId);
if (tmq) (void)tsem2_post(&tmq->rspSem);
if (tmq) {
if (tsem2_post(&tmq->rspSem) != 0){
tscError("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
}
}
if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", refId, ret);
}
return code;
}
@ -1763,7 +1798,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pVgOffsetHashMap == NULL) {
(void)taosArrayDestroy(newTopics);
taosArrayDestroy(newTopics);
return false;
}
@ -1848,10 +1883,10 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->enableBatchMeta = tmq->enableBatchMeta;
}
int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) {
void tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
if (pRspObj == NULL) {
return terrno;
return;
}
pRspObj->resType = RES_TYPE__TMQ_META;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
@ -1860,13 +1895,12 @@ int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj**
(void)memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
*ppRspObj = pRspObj;
return 0;
}
int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) {
void tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) {
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
if (pRspObj == NULL) {
return terrno;
return;
}
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
@ -1876,7 +1910,6 @@ int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMet
(void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp));
tscDebug("build batchmeta Rsp from wrapper");
*ppRspObj = pRspObj;
return 0;
}
void changeByteEndian(char* pData) {
@ -1973,31 +2006,29 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
}
}
int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
void tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqRspObj** ppRspObj) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
if (pRspObj == NULL) {
return terrno;
return;
}
pRspObj->common.resType = RES_TYPE__TMQ;
(void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
*ppRspObj = pRspObj;
return 0;
}
int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
void tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqTaosxRspObj** ppRspObj) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
if (pRspObj == NULL) {
return terrno;
return;
}
pRspObj->common.resType = RES_TYPE__TMQ_METADATA;
(void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
*ppRspObj = pRspObj;
return 0;
}
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
@ -2156,12 +2187,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
while (1) {
SMqRspWrapper* pRspWrapper = NULL;
(void)taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (pRspWrapper == NULL) {
(void)taosReadAllQitems(tmq->mqueue, tmq->qall);
(void)taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (pRspWrapper == NULL) {
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
return NULL;
}
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
return NULL;
}
}
@ -2239,7 +2269,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
} else { // build rsp
int64_t numOfRows = 0;
SMqRspObj* pRsp = NULL;
(void)tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp);
tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp);
tmq->totalRows += numOfRows;
pVg->emptyBlockReceiveTs = 0;
if (pRsp && tmq->replayEnable) {
@ -2293,7 +2323,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp
SMqMetaRspObj* pRsp = NULL;
(void)tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp);
tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp);
taosMemoryFreeClear(pollRspWrapper->pEpset);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
@ -2331,7 +2361,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever,
tmq->consumerId, true);
SMqBatchMetaRspObj* pRsp = NULL;
(void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp);
tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp);
taosMemoryFreeClear(pollRspWrapper->pEpset);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
@ -2381,9 +2411,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
// build rsp
int64_t numOfRows = 0;
SMqTaosxRspObj* pRsp = NULL;
if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) {
tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId);
}
tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp);
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0};
@ -2551,7 +2579,10 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
}
if (code == 0) {
(void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
if (code != 0){
tscError("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
}
}
return code;
}
@ -2709,7 +2740,9 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
pInfo->code = code;
(void)tsem2_post(&pInfo->sem);
if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem in commit cb");
}
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
@ -2738,10 +2771,14 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
}
(void)tsem2_wait(&pInfo->sem);
if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit");
}
code = pInfo->code;
(void)tsem2_destroy(&pInfo->sem);
if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit");
}
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
@ -2806,12 +2843,16 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId,
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
if (code == 0) {
(void)tsem2_wait(&pInfo->sem);
if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit offset");
}
code = pInfo->code;
}
if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
(void)tsem2_destroy(&pInfo->sem);
if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit offset");
}
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
@ -2864,11 +2905,11 @@ end:
}
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (param == NULL) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
if (pParam == NULL) {
goto FAIL;
}
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
@ -2906,22 +2947,30 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper);
} else {
if (taosWriteQitem(tmq->mqueue, pWrapper) != 0){
code = taosWriteQitem(tmq->mqueue, pWrapper);
if (code != 0) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper);
tscError("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
}
}
}
END:
(void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
{
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
if (ret != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
}
}
FAIL:
if (pParam->sync) {
if (pParam && pParam->sync) {
SAskEpInfo* pInfo = pParam->pParam;
if (pInfo) {
pInfo->code = code;
(void)tsem2_post(&pInfo->sem);
if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem askep cb");
}
}
}
@ -2943,11 +2992,15 @@ int32_t syncAskEp(tmq_t* pTmq) {
int32_t code = askEp(pTmq, pInfo, true, false);
if (code == 0) {
(void)tsem2_wait(&pInfo->sem);
if (tsem2_wait(&pInfo->sem) != 0){
tscError("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
}
code = pInfo->code;
}
(void)tsem2_destroy(&pInfo->sem);
if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem sync ask ep");
}
taosMemoryFree(pInfo);
return code;
}
@ -3118,7 +3171,9 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
END:
pCommon->code = code;
if (total == pParam->totalReq) {
(void)tsem2_post(&pCommon->rsp);
if (tsem2_post(&pCommon->rsp) != 0) {
tscError("failed to post semaphore in get wal cb");
}
}
if (pMsg) {
@ -3133,8 +3188,10 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
if (pCommon == NULL) {
return;
}
(void)taosArrayDestroy(pCommon->pList);
(void)tsem2_destroy(&pCommon->rsp);
taosArrayDestroy(pCommon->pList);
if(tsem2_destroy(&pCommon->rsp) != 0) {
tscError("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
}
(void)taosThreadMutexDestroy(&pCommon->mutex);
taosMemoryFree(pCommon->pTopicName);
taosMemoryFree(pCommon);
@ -3170,7 +3227,9 @@ end:
taosMemoryFree(pMsg->pEpSet);
}
pParam->code = code;
(void)tsem2_post(&pParam->sem);
if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post semaphore in tmCommittedCb");
}
return code;
}
@ -3234,12 +3293,16 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server1");
}
taosMemoryFree(pParam);
return code;
}
(void)tsem2_wait(&pParam->sem);
if (tsem2_wait(&pParam->sem) != 0){
tscError("failed to wait semaphore in get committed from server");
}
code = pParam->code;
if (code == TSDB_CODE_SUCCESS) {
if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
@ -3249,7 +3312,9 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
}
(void)tsem2_destroy(&pParam->sem);
if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server2");
}
taosMemoryFree(pParam);
return code;
@ -3520,7 +3585,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
}
(void)tsem2_wait(&pCommon->rsp);
if (tsem2_wait(&pCommon->rsp) != 0){
tscError("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
}
code = pCommon->code;
if (code != TSDB_CODE_SUCCESS) {
@ -3583,7 +3650,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
}
SMqSeekParam* pParam = param;
pParam->code = code;
(void)tsem2_post(&pParam->sem);
if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post sem in tmqSeekCb");
}
return 0;
}
@ -3680,14 +3749,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
}
taosMemoryFree(pParam);
return code;
}
(void)tsem2_wait(&pParam->sem);
if (tsem2_wait(&pParam->sem) != 0){
tscError("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
}
code = pParam->code;
(void)tsem2_destroy(&pParam->sem);
if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
}
taosMemoryFree(pParam);
tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));

View File

@ -88,6 +88,9 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
#endif
void toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
if (pName == NULL){
return;
}
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
snprintf(pName->dbname, sizeof(pName->dbname), "%s", pDbName);

View File

@ -199,7 +199,7 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons
taosWLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
if (pConsumerEp) {
(void)taosArrayDestroy(pConsumerEp->offsetRows);
taosArrayDestroy(pConsumerEp->offsetRows);
pConsumerEp->offsetRows = data->offsetRows;
data->offsetRows = NULL;
}

View File

@ -248,7 +248,7 @@ static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash
MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
}
(void)taosArrayDestroy(pConsumerEp->vgs);
taosArrayDestroy(pConsumerEp->vgs);
MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
actualRemoved++;
@ -682,8 +682,8 @@ END:
static void freeRebalanceItem(void *param) {
SMqRebInfo *pInfo = param;
(void)taosArrayDestroy(pInfo->newConsumers);
(void)taosArrayDestroy(pInfo->removedConsumers);
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
// type = 0 remove type = 1 add
@ -738,9 +738,13 @@ static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHash
}
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (!pVgroup) {
(void)mndGetOrCreateRebSub(rebSubHash, key, NULL);
code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
if (code != 0){
mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
}else{
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
}
}
mndReleaseVgroup(pMnode, pVgroup);
}
taosRUnLockLatch(&pSub->lock);
@ -813,10 +817,10 @@ void mndRebCntDec() {
}
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
(void)taosArrayDestroy(rebOutput->newConsumers);
(void)taosArrayDestroy(rebOutput->modifyConsumers);
(void)taosArrayDestroy(rebOutput->removedConsumers);
(void)taosArrayDestroy(rebOutput->rebVgs);
taosArrayDestroy(rebOutput->newConsumers);
taosArrayDestroy(rebOutput->modifyConsumers);
taosArrayDestroy(rebOutput->removedConsumers);
taosArrayDestroy(rebOutput->rebVgs);
tDeleteSubscribeObj(rebOutput->pSub);
taosMemoryFree(rebOutput->pSub);
}
@ -858,7 +862,7 @@ static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
(void)taosArrayDestroy(pConsumerEp->vgs);
taosArrayDestroy(pConsumerEp->vgs);
MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
}
END:

View File

@ -302,7 +302,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
taosMemoryFreeClear(pTopic->ast);
taosMemoryFreeClear(pTopic->physicalPlan);
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
(void)taosArrayDestroy(pTopic->ntbColIds);
taosArrayDestroy(pTopic->ntbColIds);
return 0;
}
@ -467,7 +467,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
MND_TMQ_NULL_CHECK(topicObj.ntbColIds);
MND_TMQ_RETURN_CHECK(extractTopicTbInfo(pAst, &topicObj));
if (topicObj.ntbUid == 0) {
(void)taosArrayDestroy(topicObj.ntbColIds);
taosArrayDestroy(topicObj.ntbColIds);
topicObj.ntbColIds = NULL;
}
@ -505,7 +505,7 @@ END:
taosMemoryFreeClear(topicObj.physicalPlan);
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.ast);
(void)taosArrayDestroy(topicObj.ntbColIds);
taosArrayDestroy(topicObj.ntbColIds);
if (topicObj.schema.nCols) {
taosMemoryFreeClear(topicObj.schema.pSchema);
}
@ -567,9 +567,15 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
{
SName dbname = {0};
(void)tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB); // ignore error
int32_t ret = tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB);
if (ret != 0){
mError("failed to parse db name:%s, ret:%d", createTopicReq.subDbName, ret);
}
SName topicName = {0};
(void)tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // ignore error
ret = tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (ret != 0){
mError("failed to parse topic name:%s, ret:%d", createTopicReq.name, ret);
}
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
createTopicReq.sql, strlen(createTopicReq.sql));
}
@ -735,7 +741,10 @@ END:
}
SName name = {0};
(void)tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // ignore error
int32_t ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (ret != 0) {
mError("topic:%s, failed to drop since %s", dropReq.name, tstrerror(ret));
}
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
tFreeSMDropTopicReq(&dropReq);

View File

@ -344,7 +344,9 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
.pCont = pHandle->msg->pCont,
.contLen = pHandle->msg->contLen,
.info = pHandle->msg->info};
(void)tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
}
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
@ -643,7 +645,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tDecoderInit(&dc, (uint8_t*)msg, msgLen);
ret = tDecodeSMqRebVgReq(&dc, &req);
// decode req
if (ret < 0) {
goto end;
}
@ -653,7 +654,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosRLockLatch(&pTq->lock);
STqHandle* pHandle = NULL;
(void)tqMetaGetHandle(pTq, req.subKey, &pHandle); // ignore return code
int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
if (code != 0){
tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one", pTq->pVnode->config.vgId, req.subKey);
}
taosRUnLockLatch(&pTq->lock);
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
@ -662,6 +666,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
STqHandle handle = {0};

View File

@ -390,8 +390,8 @@ END:
tdbFree(pKey);
tdbFree(pVal);
int32_t ret = tdbTbcClose(pCur);
if (code == 0 && ret != 0) {
code = ret;
if (ret != 0) {
tqError("failed to close tbc, ret:%d", ret);
}
return code;
}
@ -464,12 +464,10 @@ static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
END:
tdbFree(pKey);
tdbFree(pVal);
int32_t ret = tdbTbcClose(pCur);
if (code == 0) {
code = ret;
if (ret != 0) {
tqError("failed to close tbc, ret:%d", ret);
}
tDeleteSTqCheckInfo(&info);
return code;
}
@ -534,48 +532,44 @@ END:
taosMemoryFree(offset);
taosMemoryFree(offsetNew);
// return 0 always, so ignore
int32_t ret = tdbTbClose(pExecStore);
if (ret != 0) {
tqError("vgId:%d failed to close stream exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
tqError("failed to close tb, ret:%d", ret);
}
ret = tdbTbClose(pCheckStore);
if (ret != 0) {
tqError("vgId:%d failed to close stream check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
tqError("failed to close tb, ret:%d", ret);
}
ret = tdbClose(pMetaDB);
if (ret != 0) {
tqError("vgId:%d failed to close stream meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
tqError("failed to close tdb, ret:%d", ret);
}
return code;
}
void tqMetaClose(STQ* pTq) {
int32_t code = 0;
int32_t ret = 0;
if (pTq->pExecStore) {
code = tdbTbClose(pTq->pExecStore);
if (code) {
tqError("vgId:%d failed to close tq exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
ret = tdbTbClose(pTq->pExecStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
}
if (pTq->pCheckStore) {
code = tdbTbClose(pTq->pCheckStore);
if (code) {
tqError("vgId:%d failed to close tq check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
ret = tdbTbClose(pTq->pCheckStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
}
if (pTq->pOffsetStore) {
code = tdbTbClose(pTq->pOffsetStore);
if (code) {
tqError("vgId:%d failed to close tq offset store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
ret = tdbTbClose(pTq->pOffsetStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
}
code = tdbClose(pTq->pMetaDB);
if (code) {
tqError("vgId:%d failed to close tq meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
ret = tdbClose(pTq->pMetaDB);
if (ret != 0) {
tqError("failed to close tdb, ret:%d", ret);
}
}

View File

@ -77,7 +77,10 @@ _err:
}
void tqSnapReaderClose(STqSnapReader** ppReader) {
(void)tdbTbcClose((*ppReader)->pCur);
int32_t ret = tdbTbcClose((*ppReader)->pCur);
if (ret != 0){
tqError("vgId:%d, vnode snapshot tq reader close failed since %s", TD_VID((*ppReader)->pTq->pVnode), tstrerror(ret));
}
taosMemoryFree(*ppReader);
*ppReader = NULL;
}

View File

@ -112,8 +112,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
} else {
code = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (code) {
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (!ret) {
tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
}
}

View File

@ -2889,7 +2889,7 @@ TEST(apiTest, catalogChkAuth_test) {
SUserAuthInfo authInfo = {0};
SUserAuthRes authRes = {0};
TAOS_STRCPY(authInfo.user, ctgTestUsername);
(void)toName(1, ctgTestDbname, ctgTestSTablename, &authInfo.tbName);
toName(1, ctgTestDbname, ctgTestSTablename, &authInfo.tbName);
authInfo.type = AUTH_TYPE_READ;
bool exists = false;
code = catalogChkAuthFromCache(pCtg, &authInfo, &authRes, &exists);

View File

@ -169,7 +169,8 @@ static EDealRes authSelectImpl(SNode* pNode, void* pContext) {
SNode* pTagCond = NULL;
STableNode* pTable = (STableNode*)pNode;
#ifdef TD_ENTERPRISE
SName name;
SName name = {0};
toName(pAuthCxt->pParseCxt->acctId, pTable->dbName, pTable->tableName, &name);
STableMeta* pTableMeta = NULL;
toName(pAuthCxt->pParseCxt->acctId, pTable->dbName, pTable->tableName, &name);
int32_t code = getTargetMetaImpl(

View File

@ -525,7 +525,7 @@ static int32_t getTargetMeta(STranslateContext* pCxt, const SName* pName, STable
}
static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
return getTargetMeta(pCxt, &name, pMeta, false);
}
@ -557,7 +557,7 @@ static int32_t getTableCfg(STranslateContext* pCxt, const SName* pName, STableCf
static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
@ -635,7 +635,7 @@ static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pNam
static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
SVgroupInfo* pInfo) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
return getTableHashVgroupImpl(pCxt, &name, pInfo);
}
@ -4686,7 +4686,7 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
pRealTable->ratio = (NULL != pCxt->pExplainOpt ? pCxt->pExplainOpt->ratio : 1.0);
// The SRealTableNode created through ROLLUP already has STableMeta.
if (NULL == pRealTable->pMeta) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name);
code = getTargetMeta(pCxt, &name, &(pRealTable->pMeta), true);
if (TSDB_CODE_SUCCESS != code) {
@ -6238,7 +6238,7 @@ static void findVgroupsFromEqualTbname(STranslateContext* pCxt, SArray* aTbnames
}
for (int j = 0; j < nTbls; ++j) {
SName snameTb;
SName snameTb = {0};
char* tbName = taosArrayGetP(aTbnames, j);
toName(pCxt->pParseCxt->acctId, dbName, tbName, &snameTb);
SVgroupInfo vgInfo = {0};
@ -6265,7 +6265,7 @@ static void findVgroupsFromEqualTbname(STranslateContext* pCxt, SArray* aTbnames
}
static int32_t replaceToChildTableQuery(STranslateContext* pCxt, SEqCondTbNameTableInfo* pInfo) {
SName snameTb;
SName snameTb = {0};
int32_t code = 0;
SRealTableNode* pRealTable = pInfo->pRealTable;
char* tbName = taosArrayGetP(pInfo->aTbnames, 0);
@ -8798,7 +8798,7 @@ static int32_t buildRollupFuncs(SNodeList* pFuncs, SArray** pArray) {
static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
int32_t code = TSDB_CODE_SUCCESS;
SName tableName;
SName tableName = {0};
pReq->igExists = pStmt->ignoreExists;
pReq->delay1 = pStmt->pOptions->maxDelay1;
pReq->delay2 = pStmt->pOptions->maxDelay2;
@ -8873,20 +8873,20 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) {
SDropTableClause* pClause = (SDropTableClause*)nodesListGetNode(pStmt->pTables, 0);
SName tableName;
SName tableName = {0};
if (pStmt->withTsma) return TSDB_CODE_SUCCESS;
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName);
return doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists);
}
static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) {
SName tableName;
SName tableName = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName);
return doTranslateDropSuperTable(pCxt, &tableName, pStmt->ignoreNotExists);
}
static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) {
SName tableName;
SName tableName = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName);
int32_t code = tNameExtractFullName(&tableName, pAlterReq->name);
if (TSDB_CODE_SUCCESS != code) return code;
@ -9403,7 +9403,7 @@ static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
}
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name);
int32_t code = tNameExtractFullName(&name, pReq->name);
if (TSDB_CODE_SUCCESS == code) {
@ -9556,7 +9556,7 @@ static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt*
}
static int32_t buildCreateTagIndexReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SCreateTagIndexReq* pReq) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name);
int32_t code = tNameExtractFullName(&name, pReq->idxName);
if (TSDB_CODE_SUCCESS == code) {
@ -9596,7 +9596,7 @@ static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateInde
static int32_t translateCreateNormalIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
int32_t code = 0;
SName name;
SName name = {0};
STableMeta* pMeta = NULL;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = getTargetMeta(pCxt, &name, &pMeta, false);
@ -9640,7 +9640,7 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
SMDropSmaReq dropSmaReq = {0};
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name);
int32_t code = tNameExtractFullName(&name, dropSmaReq.name);
if (TSDB_CODE_SUCCESS != code) return code;
@ -9714,7 +9714,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
}
int32_t code = TSDB_CODE_SUCCESS;
SName name;
SName name = {0};
if ('\0' != pStmt->subSTbName[0]) {
pReq->subType = TOPIC_SUB_TYPE__TABLE;
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
@ -9842,7 +9842,7 @@ static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
SName name;
SName name = {0};
STableMeta* pMeta = NULL;
toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
int32_t code = getTargetMeta(pCxt, &name, &pMeta, false);
@ -9966,7 +9966,7 @@ static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt)
#ifdef TD_ENTERPRISE
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
int32_t origCode = code;
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
SViewMeta* pMeta = NULL;
code = getViewMetaFromMetaCache(pCxt, &name, &pMeta);
@ -10084,7 +10084,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
#ifdef TD_ENTERPRISE
SRealTableNode* pRealTable = (SRealTableNode*)((SSelectStmt*)pStmt->pQuery)->pFromTable;
SName name;
SName name = {0};
STableMeta* pMeta = NULL;
int8_t tableType = 0;
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name);
@ -11325,7 +11325,7 @@ static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pSt
#endif
SParseSqlRes res = {.resType = PARSE_SQL_RES_SCHEMA};
SName name;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN];
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, &name);
(void)tNameGetFullDbName(&name, dbFName);
@ -11367,7 +11367,7 @@ static int32_t translateDropView(STranslateContext* pCxt, SDropViewStmt* pStmt)
#endif
SCMDropViewReq dropReq = {0};
SName name;
SName name = {0};
int32_t code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
if (TSDB_CODE_SUCCESS == code) {
(void)tNameGetFullDbName(&name, dropReq.dbFName);
@ -11488,7 +11488,7 @@ static int32_t translateGrantTagCond(STranslateContext* pCxt, SGrantStmt* pStmt,
int32_t code = createRealTableForGrantTable(pStmt, &pTable);
if (TSDB_CODE_SUCCESS == code) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pTable->table.dbName, pTable->table.tableName, &name);
code = getTargetMeta(pCxt, &name, &(pTable->pMeta), false);
if (code) {
@ -11530,7 +11530,7 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
req.privileges = pStmt->privileges;
#ifdef TD_ENTERPRISE
if (0 != pStmt->tabName[0]) {
SName name;
SName name = {0};
STableMeta* pTableMeta = NULL;
toName(pCxt->pParseCxt->acctId, pStmt->objName, pStmt->tabName, &name);
code = getTargetMeta(pCxt, &name, &pTableMeta, true);
@ -11566,7 +11566,7 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
#ifdef TD_ENTERPRISE
if (0 != pStmt->tabName[0]) {
SName name;
SName name = {0};
STableMeta* pTableMeta = NULL;
toName(pCxt->pParseCxt->acctId, pStmt->objName, pStmt->tabName, &name);
code = getTargetMeta(pCxt, &name, &pTableMeta, true);
@ -11682,7 +11682,7 @@ static int32_t translateShowCreateTable(STranslateContext* pCxt, SShowCreateTabl
}
int32_t code = getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pDbCfg);
if (TSDB_CODE_SUCCESS == code) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = getTableCfg(pCxt, &name, (STableCfg**)&pStmt->pTableCfg);
}
@ -11693,7 +11693,7 @@ static int32_t translateShowCreateView(STranslateContext* pCxt, SShowCreateViewS
#ifndef TD_ENTERPRISE
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, &name);
return getViewMetaFromMetaCache(pCxt, &name, (SViewMeta**)&pStmt->pViewMeta);
#endif
@ -11962,7 +11962,7 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
SName* useTbName) {
SName name;
SName name = {0};
SDbCfgInfo pDbInfo = {0};
int32_t code = TSDB_CODE_SUCCESS;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name);
@ -12181,7 +12181,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SMDropSmaReq dropReq = {0};
SName name;
SName name = {0};
STableTSMAInfo* pTsma = NULL;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name);
code = tNameExtractFullName(&name, dropReq.name);
@ -13088,7 +13088,7 @@ static int32_t rewriteShowVgroups(STranslateContext* pCxt, SQuery* pQuery) {
static int32_t checkShowTags(STranslateContext* pCxt, const SShowStmt* pShow) {
int32_t code = 0;
SName name;
SName name = {0};
STableMeta* pTableMeta = NULL;
toName(pCxt->pParseCxt->acctId, ((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal, &name);
code = getTargetMeta(pCxt, &name, &pTableMeta, true);
@ -13449,7 +13449,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = checkCreateTable(pCxt, pStmt, false);
SVgroupInfo info = {0};
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroupImpl(pCxt, &name, &info);
@ -13697,7 +13697,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
}
if (TSDB_CODE_SUCCESS == code) {
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = collectUseTable(&name, pCxt->pTargetTables);
}
@ -14463,7 +14463,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
SName name;
SName name = {0};
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) {
@ -14722,7 +14722,7 @@ static int32_t buildRenameColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
}
if (TSDB_NORMAL_TABLE == pTableMeta->tableType) {
SArray* pTsmas = NULL;
SName tbName;
SName tbName = {0};
int32_t code = 0;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
if (pCxt->pMetaCache) code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas);