opt transport

This commit is contained in:
yihaoDeng 2024-09-07 17:29:35 +08:00
parent f797c8ed2e
commit 94891d5bff
14 changed files with 522 additions and 528 deletions

View File

@ -323,6 +323,7 @@
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_RELEASE, "task-release", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_SCH_MSG)

View File

@ -93,6 +93,10 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026)
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029)

View File

@ -335,8 +335,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg));
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS;
}
@ -392,8 +391,7 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
if (code) {
doRequestCallback(pRequest, code);
}
@ -1557,9 +1555,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
return code;
}
int64_t transporterId = 0;
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId,
body);
// int64_t transporterId = 0;
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
if (TSDB_CODE_SUCCESS != code) {
destroyTscObj(*pTscObj);
tscError("failed to send connect msg to server, code:%s", tstrerror(code));

View File

@ -16,19 +16,19 @@
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "clientMonitor.h"
#include "clientStmt.h"
#include "functionMgt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "version.h"
#include "tcompare.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
@ -120,7 +120,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
}
STscObj *pObj = NULL;
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
if (TSDB_CODE_SUCCESS == code) {
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
if (NULL == rid) {
@ -183,15 +183,15 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
return 0;
}
typedef struct SFetchWhiteListInfo{
int64_t connId;
typedef struct SFetchWhiteListInfo {
int64_t connId;
__taos_async_whitelist_fn_t userCbFn;
void* userParam;
void *userParam;
} SFetchWhiteListInfo;
int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param;
TAOS* taos = &pInfo->connId;
int32_t fetchWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) {
SFetchWhiteListInfo *pInfo = (SFetchWhiteListInfo *)param;
TAOS *taos = &pInfo->connId;
if (code != TSDB_CODE_SUCCESS) {
pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
taosMemoryFree(pMsg->pData);
@ -209,7 +209,7 @@ int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
return terrno;
}
uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
uint64_t *pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
if (pWhiteLists == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
@ -238,7 +238,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
return;
}
int64_t connId = *(int64_t*)taos;
int64_t connId = *(int64_t *)taos;
STscObj *pTsc = acquireTscObj(connId);
if (NULL == pTsc) {
@ -255,7 +255,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
return;
}
void* pReq = taosMemoryMalloc(msgLen);
void *pReq = taosMemoryMalloc(msgLen);
if (pReq == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
releaseTscObj(connId);
@ -269,7 +269,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
return;
}
SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
SFetchWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
if (pParam == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pReq);
@ -280,9 +280,9 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
pParam->connId = connId;
pParam->userCbFn = fp;
pParam->userParam = param;
SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pSendInfo == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
releaseTscObj(connId);
@ -296,9 +296,8 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) {
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
tscWarn("failed to async send msg to server");
}
releaseTscObj(connId);
@ -443,7 +442,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
if(pRequest->inCallback) {
if (pRequest->inCallback) {
tscError("can not call taos_fetch_row before query callback ends.");
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
@ -454,7 +453,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo = NULL;
if (msg->common.resIter == -1) {
if(tmqGetNextResInfo(res, true, &pResultInfo) != 0){
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
return NULL;
}
} else {
@ -466,7 +465,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo->current += 1;
return pResultInfo->row;
} else {
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0){
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
return NULL;
}
@ -540,22 +539,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_VARBINARY:{
void* data = NULL;
case TSDB_DATA_TYPE_VARBINARY: {
void *data = NULL;
uint32_t size = 0;
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if(taosAscii2Hex(row[i], charLen, &data, &size) < 0){
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if (taosAscii2Hex(row[i], charLen, &data, &size) < 0) {
break;
}
(void)memcpy(str + len, data, size);
len += size;
taosMemoryFree(data);
}break;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY || fields[i].type == TSDB_DATA_TYPE_GEOMETRY) {
if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY ||
fields[i].type == TSDB_DATA_TYPE_GEOMETRY) {
if (charLen > fields[i].bytes || charLen < 0) {
tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
break;
@ -664,7 +664,8 @@ const char *taos_get_client_info() { return version; }
// return int32_t
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -675,7 +676,8 @@ int taos_affected_rows(TAOS_RES *res) {
// return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -725,7 +727,8 @@ int taos_select_db(TAOS *taos, const char *db) {
}
void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return;
}
@ -784,7 +787,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
return pRequest->code;
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = NULL;
int32_t code = tmqGetNextResInfo(res, true, &pResultInfo);
int32_t code = tmqGetNextResInfo(res, true, &pResultInfo);
if (code != 0) return code;
pResultInfo->current = pResultInfo->numOfRows;
@ -807,7 +810,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = NULL;
int32_t code = tmqGetNextResInfo(res, false, &pResultInfo);
int32_t code = tmqGetNextResInfo(res, false, &pResultInfo);
if (code != 0) {
(*numOfRows) = 0;
return 0;
@ -953,7 +956,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
(void)memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
(void)memset(pResultMeta, 0, sizeof(*pResultMeta));
}
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
}
@ -999,7 +1002,7 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
}
pNewRequest->pQuery = NULL;
code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pNewRequest->pQuery);
code = nodesMakeNode(QUERY_NODE_QUERY, (SNode **)&pNewRequest->pQuery);
if (pNewRequest->pQuery) {
pNewRequest->pQuery->pRoot = pRoot;
pRoot = NULL;
@ -1271,7 +1274,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
(void)refreshMeta(pRequest->pTscObj, pRequest); //ignore return code,try again
(void)refreshMeta(pRequest->pTscObj, pRequest); // ignore return code,try again
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
@ -1285,7 +1288,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
SRequestObj* pUserReq = pRequest;
SRequestObj *pUserReq = pRequest;
(void)acquireRequest(pRequest->self);
while (pUserReq) {
if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
@ -1631,7 +1634,6 @@ TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) {
return pStmt;
}
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
@ -1874,7 +1876,7 @@ int taos_stmt_close(TAOS_STMT *stmt) {
return stmtClose(stmt);
}
int taos_set_conn_mode(TAOS* taos, int mode, int value) {
int taos_set_conn_mode(TAOS *taos, int mode, int value) {
if (taos == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
@ -1897,6 +1899,4 @@ int taos_set_conn_mode(TAOS* taos, int mode, int value) {
return 0;
}
char* getBuildInfo(){
return buildinfo;
}
char *getBuildInfo() { return buildinfo; }

View File

@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
}
MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type,
.fileName = p->fileName,
.pFile = p->pFile,
.offset = p->offset,
.data = NULL};
.type = p->type,
.fileName = p->fileName,
.pFile = p->pFile,
.offset = p->offset,
.data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL;
}
@ -161,10 +161,9 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;
int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
FAILED:
FAILED:
monitorFreeSlowLogDataEx(param);
return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
}
@ -279,7 +278,7 @@ void monitorCreateClient(int64_t clusterId) {
return;
fail:
fail:
destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock);
}
@ -295,7 +294,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor;
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter);
goto end;
@ -308,7 +307,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
newCounter);
end:
end:
taosWUnLockLatch(&monitorLock);
}
@ -331,13 +330,13 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
if (taos_counter_inc(*ppCounter, label_values) != 0){
if (taos_counter_inc(*ppCounter, label_values) != 0) {
tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end:
end:
taosWUnLockLatch(&monitorLock);
}
@ -495,7 +494,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
}
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL){
if (fileName == NULL) {
return;
}
int64_t size = getFileSize(*fileName);
@ -504,10 +503,11 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
} else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
if (code == 0){
if (code == 0) {
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
}else{
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code);
} else {
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId,
code);
}
*fileName = NULL;
}

View File

@ -37,7 +37,7 @@ struct SMqMgmt {
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
static int8_t pollFlag = 0;
static int8_t pollFlag = 0;
typedef struct {
int32_t code;
@ -121,7 +121,7 @@ struct tmq_t {
typedef struct SAskEpInfo {
int32_t code;
tsem2_t sem;
tsem2_t sem;
} SAskEpInfo;
enum {
@ -191,7 +191,7 @@ typedef struct {
} SMqPollRspWrapper;
typedef struct {
tsem2_t rspSem;
tsem2_t rspSem;
int32_t rspErr;
} SMqSubscribeCbParam;
@ -219,12 +219,12 @@ typedef struct SMqVgCommon {
} SMqVgCommon;
typedef struct SMqSeekParam {
tsem2_t sem;
tsem2_t sem;
int32_t code;
} SMqSeekParam;
typedef struct SMqCommittedParam {
tsem2_t sem;
tsem2_t sem;
int32_t code;
SMqVgOffset vgOffset;
} SMqCommittedParam;
@ -242,18 +242,18 @@ typedef struct {
int32_t waitingRspNum;
int32_t code;
tmq_commit_cb* callbackFn;
void* userParam;
void* userParam;
} SMqCommitCbParamSet;
typedef struct {
SMqCommitCbParamSet* params;
char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
int64_t consumerId;
} SMqCommitCbParam;
typedef struct SSyncCommitInfo {
tsem2_t sem;
tsem2_t sem;
int32_t code;
} SSyncCommitInfo;
@ -334,7 +334,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcasecmp(key, "session.timeout.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 6000 || tmp > 1800000){
if (tmp < 6000 || tmp > 1800000) {
return TMQ_CONF_INVALID;
}
conf->sessionTimeoutMs = tmp;
@ -343,7 +343,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs) {
return TMQ_CONF_INVALID;
}
conf->heartBeatIntervalMs = tmp;
@ -352,7 +352,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcasecmp(key, "max.poll.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp > INT32_MAX){
if (tmp < 1000 || tmp > INT32_MAX) {
return TMQ_CONF_INVALID;
}
conf->maxPollIntervalMs = tmp;
@ -515,7 +515,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return TSDB_CODE_INVALID_PARA;
@ -552,9 +552,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0;
(void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
if (code != 0) {
(void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code;
@ -562,7 +561,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
return code;
}
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic **topic) {
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
@ -577,8 +576,8 @@ static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam,
int32_t rspNum, SMqCommitCbParamSet** ppParamSet) {
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
SMqCommitCbParamSet** ppParamSet) {
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -595,7 +594,7 @@ static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, voi
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
SMqClientTopic* pTopic = NULL;
int32_t code = getTopicByName(tmq, pTopicName, &pTopic);
int32_t code = getTopicByName(tmq, pTopicName, &pTopic);
if (code != 0) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
return code;
@ -723,7 +722,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
taosRUnLockLatch(&tmq->lock);
goto end;
}
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) {
@ -769,7 +768,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
if (pParamSet->waitingRspNum != 1) {
// count down since waiting rsp num init as 1
code = commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
if (code != 0){
if (code != 0) {
tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
pParamSet = NULL;
goto end;
@ -824,14 +823,14 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
}
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0){
if (code != 0) {
goto _return;
}
if (pMsg == NULL || param == NULL) {
code = TSDB_CODE_INVALID_PARA;
goto _return;
}
SMqHbRsp rsp = {0};
code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
if (code != 0) {
@ -858,7 +857,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
taosWUnLockLatch(&tmq->lock);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
tDestroySMqHbRsp(&rsp);
_return:
@ -881,32 +880,32 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.epoch = tmq->epoch;
req.pollFlag = atomic_load_8(&pollFlag);
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
if (req.topics == NULL){
if (req.topics == NULL) {
return;
}
taosRLockLatch(&tmq->lock);
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic == NULL) {
continue;
}
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
if (data == NULL){
if (data == NULL) {
continue;
}
(void)strcpy(data->topicName, pTopic->topicName);
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
if (data->offsetRows == NULL){
if (data->offsetRows == NULL) {
continue;
}
for (int j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg == NULL){
if (pVg == NULL) {
continue;
}
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
if (offRows == NULL){
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
if (offRows == NULL) {
continue;
}
offRows->vgId = pVg->vgId;
@ -955,8 +954,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed");
}
@ -964,7 +962,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
(void)atomic_val_compare_exchange_8(&pollFlag, 1, 0);
OVER:
tDestroySMqHbReq(&req);
if(tmrId != NULL){
if (tmrId != NULL) {
(void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
}
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
@ -1006,14 +1004,15 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
continue;
}
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
(void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer);
(void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
(void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->commitTimer);
&pTmq->commitTimer);
} else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
}
@ -1100,16 +1099,16 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
taosRLockLatch(&tmq->lock);
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
if(topic == NULL) {
if (topic == NULL) {
tscError("topic is null");
continue;
}
char* tmp = strchr(topic->topicName, '.');
if(tmp == NULL) {
if (tmp == NULL) {
tscError("topic name is invalid:%s", topic->topicName);
continue;
}
if(tmq_list_append(*topics, tmp+ 1) != 0) {
if (tmq_list_append(*topics, tmp + 1) != 0) {
tscError("failed to append topic:%s", tmp + 1);
continue;
}
@ -1227,27 +1226,31 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
}
code = taosOpenQueue(&pTmq->mqueue);
if (code) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId);
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
SET_ERROR_MSG_TMQ("open queue failed")
goto _failed;
}
code = taosOpenQueue(&pTmq->delayedTask);
if (code) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId);
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
SET_ERROR_MSG_TMQ("open delayed task queue failed")
goto _failed;
}
code = taosAllocateQall(&pTmq->qall);
if (code) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId);
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
SET_ERROR_MSG_TMQ("allocate qall failed")
goto _failed;
}
if (conf->groupId[0] == 0) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId);
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
goto _failed;
}
@ -1287,8 +1290,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init semaphore
if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, tstrerror(TAOS_SYSTEM_ERROR(errno)),
pTmq->groupId);
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId);
SET_ERROR_MSG_TMQ("init t_sem failed")
goto _failed;
}
@ -1371,7 +1374,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SName name = {0};
code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
if (code) {
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, code);
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
code);
goto FAIL;
}
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
@ -1382,7 +1386,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = tNameExtractFullName(&name, topicFName);
if (code) {
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, code);
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
code);
taosMemoryFree(topicFName);
goto FAIL;
}
@ -1429,8 +1434,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
goto FAIL;
}
@ -1459,7 +1463,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
tmq->commitTimer =taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
tmq->commitTimer =
taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
@ -1516,12 +1521,12 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) {
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL;
tmq_t* tmq = NULL;
SMqPollRspWrapper* pRspWrapper = NULL;
int8_t rspType = 0;
int32_t vgId = 0;
uint64_t requestId = 0;
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int8_t rspType = 0;
int32_t vgId = 0;
uint64_t requestId = 0;
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pMsg == NULL) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
@ -1530,7 +1535,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFreeClear(pMsg->pEpSet);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
int64_t refId = pParam->refId;
int64_t refId = pParam->refId;
vgId = pParam->vgId;
requestId = pParam->requestId;
tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
@ -1621,18 +1626,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
tDecoderClear(&decoder);
(void)memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId,requestId);
tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId,
requestId);
} else { // invalid rspType
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
}
END:
if (pRspWrapper){
if (pRspWrapper) {
pRspWrapper->code = code;
pRspWrapper->vgId = vgId;
(void)strcpy(pRspWrapper->topicName, pParam->topicName);
code = taosWriteQitem(tmq->mqueue, pRspWrapper);
if(code != 0){
if (code != 0) {
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
}
}
@ -1676,7 +1682,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
}
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
if (pVgEp == NULL){
if (pVgEp == NULL) {
continue;
}
(void)sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId);
@ -1712,7 +1718,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.committedOffset = offsetNew;
clientVg.offsetInfo.beginOffset = offsetNew;
}
if (taosArrayPush(pTopic->vgs, &clientVg) == NULL){
if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
pTopic->topicName);
freeClientVg(&clientVg);
@ -1773,7 +1779,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
.commitOffset = pVgCur->offsetInfo.committedOffset,
.numOfRows = pVgCur->numOfRows,
.vgStatus = pVgCur->vgStatus};
if(taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0){
if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
tscError("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
}
}
@ -1787,7 +1793,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
continue;
}
initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
if(taosArrayPush(newTopics, &topic) == NULL){
if (taosArrayPush(newTopics, &topic) == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
freeClientTopic(&topic);
}
@ -1919,7 +1925,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pDataRsp->withSchema = true;
pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
if (pDataRsp->blockSchema == NULL){
if (pDataRsp->blockSchema == NULL) {
tscError("failed to allocate memory for blockSchema");
return;
}
@ -1938,7 +1944,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if (schema) {
if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL){
if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
tscError("failed to push schema into blockSchema");
continue;
}
@ -1947,7 +1953,8 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
}
}
int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj** ppRspObj) {
int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqRspObj** ppRspObj) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
if (pRspObj == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -1959,7 +1966,8 @@ int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, in
return 0;
}
int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqTaosxRspObj** ppRspObj) {
int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqTaosxRspObj** ppRspObj) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
if (pRspObj == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -2026,10 +2034,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) {
@ -2056,10 +2063,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic == NULL){
if (pTopic == NULL) {
continue;
}
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
if (pTopic->noPrivilege) {
tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
continue;
@ -2069,7 +2076,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if (pVg == NULL) {
continue;
}
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
@ -2220,8 +2227,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
if (pVg->blockSleepForReplay > 0) {
if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%"PRId64, tmq->consumerId,
pVg->vgId, pVg->blockSleepForReplay);
tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
}
}
}
@ -2302,7 +2309,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever,
tmq->consumerId, true);
SMqBatchMetaRspObj* pRsp = NULL;
(void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp) ;
(void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
@ -2349,9 +2356,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
}
// build rsp
int64_t numOfRows = 0;
SMqTaosxRspObj* pRsp = NULL;
if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) !=0 ) {
int64_t numOfRows = 0;
SMqTaosxRspObj* pRsp = NULL;
if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) {
tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId);
}
tmq->totalRows += numOfRows;
@ -2471,7 +2478,7 @@ static void displayConsumeStatistics(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
static int32_t innerClose(tmq_t* tmq){
static int32_t innerClose(tmq_t* tmq) {
if (tmq->status != TMQ_CONSUMER_STATUS__READY) {
tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId);
return 0;
@ -2485,7 +2492,7 @@ static int32_t innerClose(tmq_t* tmq){
tmqSendHbReq((void*)(tmq->refId), NULL);
tmq_list_t* lst = tmq_list_new();
if (lst == NULL){
if (lst == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = tmq_subscribe(tmq, lst);
@ -2499,7 +2506,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
int32_t code = 0;
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) {
code = innerClose(tmq);
if(code == 0){
if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
}
}
@ -2514,12 +2521,12 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
int32_t code = 0;
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) {
code = innerClose(tmq);
if(code == 0){
if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
}
}
if (code == 0){
if (code == 0) {
(void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
}
return code;
@ -2562,13 +2569,13 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
return NULL;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
char *tmp = strchr(((SMqRspObjCommon*)res)->topic, '.');
char* tmp = strchr(((SMqRspObjCommon*)res)->topic, '.');
if (tmp == NULL) {
return NULL;
}
return tmp + 1;
} else if (TD_RES_TMQ_META(res)) {
char *tmp = strchr(((SMqMetaRspObj*)res)->topic, '.');
char* tmp = strchr(((SMqMetaRspObj*)res)->topic, '.');
if (tmp == NULL) {
return NULL;
}
@ -2584,13 +2591,13 @@ const char* tmq_get_db_name(TAOS_RES* res) {
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
char *tmp = strchr(((SMqRspObjCommon*)res)->db, '.');
char* tmp = strchr(((SMqRspObjCommon*)res)->db, '.');
if (tmp == NULL) {
return NULL;
}
return tmp + 1;
} else if (TD_RES_TMQ_META(res)) {
char *tmp = strchr(((SMqMetaRspObj*)res)->db, '.');
char* tmp = strchr(((SMqMetaRspObj*)res)->db, '.');
if (tmp == NULL) {
return NULL;
}
@ -2690,7 +2697,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
int32_t code = 0;
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if(pInfo == NULL) {
if (pInfo == NULL) {
tscError("failed to allocate memory for sync commit");
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -2836,7 +2843,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (param == NULL) {
goto FAIL;
}
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
@ -2857,7 +2864,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (pParam->sync) {
SMqAskEpRsp rsp = {0};
if(tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL){
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
(void)doUpdateLocalEp(tmq, head->epoch, &rsp);
}
tDeleteSMqAskEpRsp(&rsp);
@ -2871,10 +2878,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
(void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL){
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper);
}else{
} else {
(void)taosWriteQitem(tmq->mqueue, pWrapper);
}
}
@ -3015,7 +3022,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
if (common->withSchema) {
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter);
if (pSW){
if (pSW) {
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
}
}
@ -3032,9 +3039,9 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
pRspObj->resInfo.precision = precision;
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows,
convertUcs4);
if (code != 0){
int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols,
pRspObj->resInfo.numOfRows, convertUcs4);
if (code != 0) {
return code;
}
*pResInfo = &pRspObj->resInfo;
@ -3062,18 +3069,18 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
code = tDecodeMqDataRsp(&decoder, &rsp);
tDecoderClear(&decoder);
if (code != 0){
if (code != 0) {
goto END;
}
SMqRspHead* pHead = pMsg->pData;
SMqRspHead* pHead = pMsg->pData;
tmq_topic_assignment assignment = {.begin = pHead->walsver,
.end = pHead->walever + 1,
.currentOffset = rsp.common.rspOffset.version,
.vgId = pParam->vgId};
(void)taosThreadMutexLock(&pCommon->mutex);
if(taosArrayPush(pCommon->pList, &assignment) == NULL){
if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
pParam->vgId, pCommon->pTopicName);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
@ -3184,7 +3191,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
taosMemoryFree(sendInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tsem2_init(&pParam->sem, 0, 0) != 0){
if (tsem2_init(&pParam->sem, 0, 0) != 0) {
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
taosMemoryFree(pParam);
@ -3198,8 +3205,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam);
@ -3348,7 +3354,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = NULL;
int32_t code = getTopicByName(tmq, tname, &pTopic);
int32_t code = getTopicByName(tmq, tname, &pTopic);
if (code != 0) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
goto end;
@ -3358,10 +3364,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg == NULL){
if (pClientVg == NULL) {
continue;
}
int32_t type = pClientVg->offsetInfo.beginOffset.type;
int32_t type = pClientVg->offsetInfo.beginOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
@ -3381,7 +3387,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg == NULL){
if (pClientVg == NULL) {
continue;
}
if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
@ -3410,7 +3416,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (tsem2_init(&pCommon->rsp, 0, 0) != 0){
if (tsem2_init(&pCommon->rsp, 0, 0) != 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -3420,7 +3426,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t i = 0; i < (*numOfAssignment); ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg == NULL){
if (pClientVg == NULL) {
continue;
}
SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
@ -3475,13 +3481,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
if (code != 0) {
goto end;
}
@ -3504,7 +3509,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg == NULL){
if (pClientVg == NULL) {
continue;
}
if (pClientVg->vgId != p->vgId) {
@ -3631,7 +3636,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
taosMemoryFree(sendInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tsem2_init(&pParam->sem, 0, 0) != 0){
if (tsem2_init(&pParam->sem, 0, 0) != 0) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
taosMemoryFree(pParam);
@ -3645,8 +3650,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam);

View File

@ -56,13 +56,13 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
}
for (int32_t i = 0; i < taskNum; ++i) {
int32_t* taskId = taosArrayGet(cbParam->taskId, i);
int32_t* taskId = taosArrayGet(cbParam->taskId, i);
if (NULL == taskId) {
ctgError("taosArrayGet %d taskId failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->taskId));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
if (NULL == msgIdx) {
ctgError("taosArrayGet %d msgIdx failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->msgIdx));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
@ -114,7 +114,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(void)(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode)); // error handled internal
(void)(*gCtgAsyncFps[pTask->type].handleRspFp)(
&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode)); // error handled internal
}
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
@ -417,12 +418,12 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
} else {
int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
if (NULL == taskId) {
ctgError("taosArrayGet %d taskId failed, total:%d", 0, (int32_t)taosArrayGetSize(cbParam->taskId));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (NULL == pTask) {
ctgError("taosArrayGet %d SCtgTask failed, total:%d", *taskId, (int32_t)taosArrayGetSize(pJob->pTasks));
@ -445,7 +446,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
ctgError("get task %d SCtgMsgCtx failed, taskType:%d", -1, pTask->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pMsgCtx->pBatchs = pBatchs;
#endif
@ -526,8 +527,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob,
pMsgSendInfo->msgInfo.handle = NULL;
pMsgSendInfo->msgType = msgType;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo);
pMsgSendInfo = NULL;
if (code) {
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
@ -558,9 +558,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
ctgError("get task %d SCtgMsgCtx failed, taskType:%d", tReq->msgIdx, pTask->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SHashObj* pBatchs = pMsgCtx->pBatchs;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
SHashObj* pBatchs = pMsgCtx->pBatchs;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
@ -599,7 +599,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
} else if (CTG_TASK_GET_TB_TSMA == pTask->type) {
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
@ -616,10 +616,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
if (NULL == pFetch) {
ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, (int32_t)taosArrayGetSize(pCtx->pFetches));
ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx,
(int32_t)taosArrayGetSize(pCtx->pFetches));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
if (NULL == pTbReq) {
ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
@ -675,7 +676,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
} else if (CTG_TASK_GET_TB_TSMA == pTask->type) {
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
@ -689,22 +690,23 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
pName = ctx->pName;
}
} else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) {
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
if (NULL == pFetch) {
ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, (int32_t)taosArrayGetSize(pCtx->pFetches));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
if (NULL == pTbReq) {
ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
if (NULL == pName) {
ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
if (NULL == pFetch) {
ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx,
(int32_t)taosArrayGetSize(pCtx->pFetches));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
if (NULL == pTbReq) {
ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
if (NULL == pName) {
ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
} else {
ctgError("invalid vnode msgType %d", msgType);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
@ -1629,9 +1631,9 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
}
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out,
SCtgTaskReq* tReq, int32_t reqType) {
char* msg = NULL;
int32_t msgLen = 0;
SCtgTaskReq* tReq, int32_t reqType) {
char* msg = NULL;
int32_t msgLen = 0;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
@ -1720,7 +1722,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
#else
char dbFName[TSDB_DB_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
(void)tNameGetFullDbName(pTbName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
@ -1731,7 +1733,8 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen));
CTG_RET(
ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen));
#endif
}

View File

@ -61,14 +61,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (code) {
pInserter->submitRes.code = code;
}
if (code == TSDB_CODE_SUCCESS) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
if (NULL == pInserter->submitRes.pRsp) {
pInserter->submitRes.code = terrno;
goto _return;
}
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
@ -108,7 +108,7 @@ _return:
(void)tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
return TSDB_CODE_SUCCESS;
}
@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback;
int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
}
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
@ -166,7 +165,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
} else {
taosMemoryFree(pBuf);
}
return code;
}
@ -228,7 +227,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_NCHAR:
@ -327,11 +326,11 @@ _end:
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return terrno;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
}
@ -458,7 +457,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->explain = pInserterNode->explain;
int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
&inserter->pSchema, &suid);
if (code) {
terrno = code;
goto _return;
@ -480,9 +480,9 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
false, HASH_NO_LOCK);
if (NULL == inserter->pCols) {
goto _return;
goto _return;
}
SNode* pNode = NULL;
int32_t i = 0;
FOREACH(pNode, pInserterNode->pCols) {

View File

@ -2118,8 +2118,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;

View File

@ -997,8 +997,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
pTask->lastMsgType = msgType;
}
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
pMsgSendInfo = NULL;
if (code) {
SCH_ERR_JRET(code);

View File

@ -114,6 +114,7 @@ typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
int8_t pThrdIdx;
queue q;
int8_t inited;
SRWLatch latch;

View File

@ -96,9 +96,16 @@ typedef struct SCliConn {
int32_t seq;
int32_t shareCnt;
int8_t registered;
int8_t registered;
int8_t connnected;
SHashObj* pQTable;
} SCliConn;
typedef struct {
SCliConn* conn;
void* arg;
} SReqState;
typedef struct SCliReq {
SReqCtx* ctx;
STransMsg msg;
@ -146,6 +153,8 @@ typedef struct SCliThrd {
int32_t (*initCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
int32_t (*notifyCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
SHashObj* pIdConnTable;
} SCliThrd;
typedef struct SCliObj {
@ -229,7 +238,6 @@ static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn);
// handle except about conn
static void cliHandleExcept(SCliConn* conn, int32_t code);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
@ -257,6 +265,9 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn);
int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
@ -479,7 +490,7 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int8_t cliMayRecycleConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqs) == 0) {
if (transQueueSize(&conn->reqs) == 0 && taosHashGetSize(conn->pQTable) == 0) {
(void)delConnFromHeapCache(pThrd->connHeapCache, conn);
addConnToPool(pThrd->pool, conn);
return 1;
@ -499,12 +510,41 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.seqNum = htonl(pHead->seqNum);
return 0;
}
int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid);
code = taosHashRemove(conn->pQTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("%s conn %p failed to release req %ld from conn", CONN_GET_INST_LABEL(conn), conn, qId);
}
code = taosHashRemove(pThrd->pIdConnTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
}
tDebug("%s conn %p release req %ld", CONN_GET_INST_LABEL(conn), conn, qId);
for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReqs = transQueueGet(&conn->reqs, i);
if (pReqs->msg.info.qId == qId) {
transQueueRm(&conn->reqs, i);
destroyReq(pReqs);
i--;
}
}
return 1;
}
return 0;
}
void cliHandleResp2(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn);
SCliReq* pReq = NULL;
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
@ -522,39 +562,39 @@ void cliHandleResp2(SCliConn* conn) {
return;
}
if (cliConnMayHandleReleasReq(conn, pHead)) {
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
int64_t qId = taosHton64(pHead->qid);
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
SCliReq* pReq = NULL;
int32_t seq = htonl(pHead->seqNum);
int32_t seq = htonl(pHead->seqNum);
code = cliGetReqBySeq(conn, seq, &pReq);
if (code != 0) {
if (cliConnRmReleaseReq(conn, pHead)) {
return;
} else {
}
tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
tstrerror(code));
// TODO: notify cb
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
// TODO handle release req
// if (cliRecvReleaseReq(conn, pHead)) {
// return;
// }
STransMsg resp = {0};
code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId;
if (code != 0) {
tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
} else {
tDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%ld", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
}
code = cliNotifyCb(conn, pReq, &resp);
@ -571,122 +611,6 @@ void cliHandleResp2(SCliConn* conn) {
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
// void cliHandleResp(SCliConn* conn) {
// int32_t code = 0;
// SCliThrd* pThrd = conn->hostThrd;
// STrans* pInst = pThrd->pInst;
// cliResetConnTimer(conn);
// STransMsgHead* pHead = NULL;
// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
// int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
// if (msgLen <= 0) {
// taosMemoryFree(pHead);
// tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
// // TODO: notify cb
// pThrd->notifyExceptCb(pThrd, NULL, NULL);
// return;
// }
// if (resetBuf == 0) {
// tTrace("%s conn %p not reset read buf", transLabel(pInst), conn);
// }
// if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) {
// tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
// // TODO: notify cb
// }
// pHead->code = htonl(pHead->code);
// pHead->msgLen = htonl(pHead->msgLen);
// if (cliRecvReleaseReq(conn, pHead)) {
// return;
// }
// STransMsg transMsg = {0};
// transMsg.contLen = transContLenFromMsg(pHead->msgLen);
// transMsg.pCont = transContFromHead((char*)pHead);
// transMsg.code = pHead->code;
// transMsg.msgType = pHead->msgType;
// transMsg.info.ahandle = NULL;
// transMsg.info.traceId = pHead->traceId;
// transMsg.info.hasEpSet = pHead->hasEpSet;
// transMsg.info.cliVer = htonl(pHead->compatibilityVer);
// SCliReq* pReq = NULL;
// SReqCtx* pCtx = NULL;
// if (CONN_NO_PERSIST_BY_APP(conn)) {
// pReq = transQueuePop(&conn->reqs);
// pCtx = pReq ? pReq->ctx : NULL;
// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
// tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
// } else {
// uint64_t ahandle = (uint64_t)pHead->ahandle;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
// if (pReq == NULL) {
// transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
// tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
// transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
// if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
// transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
// tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
// transMsg.info.ahandle);
// }
// } else {
// pCtx = pReq->ctx;
// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
// tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
// }
// }
// // buf's mem alread translated to transMsg.pCont
// if (!CONN_NO_PERSIST_BY_APP(conn)) {
// transMsg.info.handle = (void*)conn->refId;
// transMsg.info.refId = (int64_t)(void*)conn->refId;
// tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
// }
// STraceId* trace = &transMsg.info.traceId;
// tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
// TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
// if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transFreeMsg(transMsg.pCont);
// return;
// }
// if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transFreeMsg(transMsg.pCont);
// return;
// }
// if (pReq == NULL || (pReq && pReq->type != Release)) {
// if (cliNotifyCb(conn, pReq, &transMsg) != 0) {
// return;
// }
// }
// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
// tDebug("conn %p msg refId: %" PRId64 "", conn, refId);
// destroyReq(pReq);
// if (cliConnSendSeqMsg(refId, conn)) {
// return;
// }
// if (cliMaySendCachedMsg(conn) == true) {
// return;
// }
// if (CONN_NO_PERSIST_BY_APP(conn)) {
// return addConnToPool(pThrd->pool, conn);
// }
// (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// }
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
// if (transQueueEmpty(&pConn->reqs)) {
// if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
@ -1266,12 +1190,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
if (pReq->msg.info.handle != 0) {
// SExHandle *p = transAcquireExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle);
// TAOS_CHECK_GOTO(specifyConnRef(pConn, false, pReq->msg.info.handle), NULL, _exception);
// } else {
// TAOS_CHECK_GOTO(allocConnRef(pConn, false), NULL, _exception);
}
code = cliMayUpdateState(pThrd, pReq, pConn);
transQueuePush(&pConn->reqs, pReq);
return cliDoConn(pThrd, pConn);
@ -1316,11 +1235,13 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = false;
transRefCliHandle(conn);
conn->seq = 0;
conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (conn->pQTable == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _failed);
}
TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed);
TAOS_CHECK_GOTO(cliGetConnTimer(pThrd, conn), &lino, _failed);
@ -1347,6 +1268,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
_failed:
if (conn) {
taosMemoryFree(conn->stream);
taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs);
taosMemoryFree(conn->dstAddr);
@ -1405,15 +1327,15 @@ static void cliDestroy(uv_handle_t* handle) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
}
delConnFromHeapCache(pThrd->connHeapCache, conn);
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
cliDestroyConnMsgs(conn, true);
delConnFromHeapCache(pThrd->connHeapCache, conn);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue);
taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf);
taosMemoryFree(conn);
@ -1457,6 +1379,7 @@ static void cliConnRmReqs(SCliConn* conn) {
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
transQueueRm(&conn->reqs, i);
destroyReq(pReq);
i--;
}
}
}
@ -1536,6 +1459,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = htonl(pConn->seq);
pHead->qid = taosHton64(pReq->info.qId);
if (pHead->comp == 0) {
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
@ -1552,8 +1476,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
pCliMsg->seq = pConn->seq;
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq);
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
}
if (j == 0) {
taosMemoryFree(wb);
@ -1571,7 +1495,11 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg);
code = cliSend2(pConn);
if (pConn->connnected) {
code = cliSend2(pConn);
} else {
// do nothing
}
return code;
}
@ -1734,6 +1662,7 @@ void cliConnCb(uv_connect_t* req, int status) {
// }
return;
}
pConn->connnected = 1;
cliConnSetSockInfo(pConn);
@ -2012,40 +1941,74 @@ int32_t cliConnHandleQueryById(SCliReq* pReq) {
int64_t queryId = (int64_t)pReq->msg.info.handle;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId);
if (exh->inited == 1) {
} else {
}
transReleaseExHandle(transGetRefMgt(), queryId);
}
return 0;
}
int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
if (qid == 0) {
return TSDB_CODE_RPC_NO_STATE;
}
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState == NULL) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
}
return code;
}
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
if (qid == 0) {
return TSDB_CODE_RPC_NO_STATE;
}
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState != 0) {
ASSERT(0);
}
SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
return code;
}
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
int32_t lino = 0;
STransMsg resp = {0};
int32_t code = (pThrd->initCb)(pThrd, pReq, NULL);
TAOS_CHECK_GOTO(code, &lino, _exception);
char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
STrans* pInst = pThrd->pInst;
SCliConn* pConn = NULL;
code = cliMayGetHandleState(pThrd, pReq, &pConn);
pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
code = cliGetOrCreateConn(pThrd, pReq, &pConn);
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
TAOS_CHECK_GOTO(code, &lino, _exception);
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notiy
return;
} else {
ASSERT(code == 0);
addConnToHeapCache(pThrd->connHeapCache, pConn);
if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
code = cliGetOrCreateConn(pThrd, pReq, &pConn);
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
TAOS_CHECK_GOTO(code, &lino, _exception);
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notiy
return;
} else {
ASSERT(code == 0);
addConnToHeapCache(pThrd->connHeapCache, pConn);
}
}
}
code = cliMayUpdateState(pThrd, pReq, pConn);
code = cliSendReq(pConn, pReq);
tTrace("%s conn %p ready", pInst->label, pConn);
@ -2550,6 +2513,11 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->pIdConnTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pThrd->connHeapCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->initCb = initCb;
pThrd->notifyCb = notfiyCb;
pThrd->notifyExceptCb = notifyExceptCb;
@ -2576,6 +2544,7 @@ _end:
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->pIdConnTable);
taosMemoryFree(pThrd);
}
@ -2632,6 +2601,8 @@ static void destroyThrdObj(SCliThrd* pThrd) {
}
taosHashCleanup(pThrd->connHeapCache);
taosHashCleanup(pThrd->pIdConnTable);
taosMemoryFree(pThrd);
}
@ -3111,24 +3082,26 @@ int32_t transReleaseCliHandle(void* handle) {
return TSDB_CODE_RPC_BROKEN_LINK;
}
STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
.info.handle = handle,
.info.ahandle = (void*)0x9527,
.info.qId = (int64_t)handle};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx));
if (pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCtx->ahandle = tmsg.info.ahandle;
SCliReq* cmsg = taosMemoryCalloc(1, sizeof(SCliReq));
if (cmsg == NULL) {
taosMemoryFree(pCtx);
return TSDB_CODE_OUT_OF_MEMORY;
}
cmsg->msg = tmsg;
cmsg->st = taosGetTimestampUs();
cmsg->type = Release;
cmsg->type = Normal;
cmsg->ctx = pCtx;
STraceId* trace = &tmsg.info.traceId;
@ -3188,31 +3161,7 @@ int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq,
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception);
}
if (handle != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh != NULL) {
taosWLockLatch(&exh->latch);
if (exh->handle == NULL && exh->inited != 0) {
SCliReq* pCliMsg = NULL;
code = transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg);
if (code != 0) {
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), handle);
TAOS_CHECK_GOTO(code, NULL, _exception);
}
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
taosWUnLockLatch(&exh->latch);
tDebug("msg refId: %" PRId64 "", handle);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0;
} else {
exh->inited = 1;
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), handle);
}
}
}
pReq->info.qId = handle;
SCliReq* pCliMsg = NULL;
TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception);
@ -3259,6 +3208,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
}
pReq->info.handle = (void*)(*transpointId);
pReq->info.qId = *transpointId;
SCliReq* pCliMsg = NULL;
TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception);
@ -3597,6 +3547,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
tDebug("release conn id %" PRId64 "", transpointId);
STransMsg msg = {.info.handle = (void*)transpointId};
msg.info.qId = transpointId;
pCli->msg = msg;
code = transAsyncSend(pThrd->asyncPool, &pCli->q);

View File

@ -71,7 +71,7 @@ typedef struct SSvrMsg {
int32_t seqNum;
void* arg;
FilteFunc func;
int8_t sent;
} SSvrMsg;
typedef struct {
@ -423,16 +423,40 @@ static int8_t uvValidConn(SSvrConn* pConn) {
}
return forbiddenIp;
}
static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) {
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn);
} else if (pHead->noResp == 0) {
transRefSrvHandle(pConn);
static int32_t uvHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
int32_t code = 0;
STrans* pInst = pConn->pInst;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid);
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE;
tTrace("conn %p recv release, and releady release by server qid%ld", pConn, qId);
// notify cli already release, cli release resouce
} else {
SSvrRegArg* arg = p;
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p recv release, notify server app, qid%ld", pConn, qId);
(void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
}
STransMsg tmsg = {.code = code,
.msgType = pHead->msgType + 1,
.info.qId = qId,
.info.traceId = pHead->traceId,
.info.seqNum = htonl(pHead->seqNum)};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
srvMsg->msg = tmsg;
srvMsg->type = Normal;
srvMsg->pConn = pConn;
transQueuePush(&pConn->srvMsgs, srvMsg);
uvStartSendRespImpl(srvMsg);
return 1;
}
return 0;
}
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pInst = pConn->pInst;
@ -440,8 +464,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
STransMsgHead* pHead = NULL;
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
int8_t resetBuf = 0;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, 0);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pInst), pConn);
return false;
@ -469,7 +493,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
}
if (uvRecvReleaseReq(pConn, pHead)) {
if (uvHandleReleaseReq(pConn, pHead)) {
return true;
}
@ -478,14 +502,14 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
transMsg.info.qId = htole64(pHead->qid);
transMsg.info.qId = taosHton64(pHead->qid);
if (transMsg.info.qId > 0) {
int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg));
if (code != 0) {
tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code));
return false;
}
// int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg));
// if (code != 0) {
// tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code));
// return false;
// }
}
if (pHead->seqNum == 0) {
@ -595,49 +619,21 @@ void uvOnSendCb(uv_write_t* req, int status) {
if (conn == NULL) return;
if (status == 0) {
tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
STraceId* trace = &msg->msg.info.traceId;
tGDebug("conn %p write data out", conn);
destroySmsg(msg);
// send cached data
if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) {
if (conn->regArg.init) {
transFreeMsg(conn->regArg.msg.pCont);
conn->regArg.init = 0;
}
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pInst = conn->pInst;
(pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
(void)transQueuePop(&conn->srvMsgs);
taosMemoryFree(msg);
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg != NULL) {
uvStartSendRespImpl(msg);
}
} else {
uvStartSendRespImpl(msg);
}
for (int32_t i = 0; i < transQueueSize(&conn->srvMsgs); i++) {
SSvrMsg* smsg = transQueueGet(&conn->srvMsgs, i);
if (smsg->sent == 1) {
transQueueRm(&conn->srvMsgs, i);
destroySmsg(smsg);
i--;
}
}
transUnrefSrvHandle(conn);
} else {
if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) {
tError("conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = true;
transUnrefSrvHandle(conn);
}
}
transUnrefSrvHandle(conn);
}
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
@ -662,7 +658,6 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
if (pMsg->pCont == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
@ -673,30 +668,18 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer);
pHead->version = TRANS_VER;
pHead->seqNum = htonl(pMsg->info.seqNum);
pHead->qid = taosHton64(pMsg->info.qId);
// handle invalid drop_task resp, TD-20098
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
(void)transQueuePop(&pConn->srvMsgs);
destroySmsg(smsg);
return TSDB_CODE_INVALID_MSG;
ASSERT(0);
// (void)transQueuePop(&pConn->srvMsgs);
// destroySmsg(smsg);
// return TSDB_CODE_INVALID_MSG;
}
// if (pConn->status == ConnNormal) {
// pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// if (smsg->type == Release) pHead->msgType = 0;
// } else {
// if (smsg->type == Release) {
// pHead->msgType = 0;
// pConn->status = ConnNormal;
// destroyConnRegArg(pConn);
// transUnrefSrvHandle(pConn);
// } else {
// // set up resp msg type
// pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// }
// }
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// pHead->msgType = pMsg->msgType;
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code);
pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
@ -712,22 +695,60 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
}
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType),
pConn->dst, pConn->src, len);
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%ld", transLabel(pInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId);
wb->base = (char*)pHead;
wb->len = len;
return 0;
}
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum) {
int32_t count = 0;
int32_t size = transQueueSize(&pConn->srvMsgs);
uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t));
if (pWb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < size; i++) {
SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i);
if (pMsg->sent == 1) {
continue;
}
uv_buf_t wb;
(void)uvPrepareSendData(pMsg, &wb);
pWb[count] = wb;
pMsg->sent = 1;
count++;
}
if (count == 0) {
taosMemoryFree(pWb);
return 0;
}
*bufNum = count;
*ppBuf = pWb;
return 0;
}
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
int32_t code = 0;
SSvrConn* pConn = smsg->pConn;
if (pConn->broken) {
return;
}
uv_buf_t wb;
if (uvPrepareSendData(smsg, &wb) < 0) {
uv_buf_t* pBuf = NULL;
int32_t bufNum = 0;
code = uvBuildToSendData(pConn, &pBuf, &bufNum);
if (code != 0) {
tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn);
return;
}
if (bufNum == 0) {
return;
}
@ -741,24 +762,33 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
return;
}
}
(void)uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
(void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
taosMemoryFree(pBuf);
}
int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) {
SSvrConn* pConn = pMsg->pConn;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qid = pMsg->msg.info.qId;
SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (p == NULL) {
tError("%s conn %p already release qid %ld", transLabel(pConn->pInst), pConn, qid);
return TSDB_CODE_RPC_NO_STATE;
} else {
transFreeMsg(p->msg.pCont);
taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
}
}
return 0;
}
static void uvStartSendResp(SSvrMsg* smsg) {
// impl
SSvrConn* pConn = smsg->pConn;
if (pConn->broken == true) {
// persist by
if (uvConnMayHandlsReleaseMsg(smsg) == TSDB_CODE_RPC_NO_STATE) {
destroySmsg(smsg);
transUnrefSrvHandle(pConn);
return;
}
if (pConn->status == ConnNormal) {
transUnrefSrvHandle(pConn);
}
if (!transQueuePush(&pConn->srvMsgs, smsg)) {
return;
}
transQueuePush(&pConn->srvMsgs, smsg);
uvStartSendRespImpl(smsg);
return;
}
@ -1199,7 +1229,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
pConn->pQTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pConn->pQTable == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
@ -1540,19 +1570,20 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg);
}
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
int32_t code = 0;
SSvrConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
uvStartSendRespImpl(msg);
return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn);
}
ASSERT(0);
// int32_t code = 0;
// SSvrConn* conn = msg->pConn;
// if (conn->status == ConnAcquire) {
// if (!transQueuePush(&conn->srvMsgs, msg)) {
// return;
// }
// uvStartSendRespImpl(msg);
// return;
// } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
// tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn);
// }
destroySmsg(msg);
// destroySmsg(msg);
}
void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) {
// send msg to client
@ -1697,6 +1728,7 @@ int32_t transReleaseSrvHandle(void* handle) {
int32_t code = 0;
SRpcHandleInfo* info = handle;
SExHandle* exh = info->handle;
int64_t qId = info->qId;
int64_t refId = info->refId;
ASYNC_CHECK_HANDLE(info->refIdMgt, refId, exh);
@ -1704,7 +1736,8 @@ int32_t transReleaseSrvHandle(void* handle) {
SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
STransMsg tmsg = {
.msgType = TDMT_SCH_TASK_RELEASE, .code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
if (m == NULL) {
@ -1713,9 +1746,10 @@ int32_t transReleaseSrvHandle(void* handle) {
}
m->msg = tmsg;
m->type = Release;
m->type = Normal;
tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle);
tDebug("%s conn %p start to %p, qId:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
qId);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(info->refIdMgt, refId);

View File

@ -60,6 +60,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit"
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")