diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 572e26bc62..ff524faefd 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -323,6 +323,7 @@ TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_TASK_RELEASE, "task-release", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_SCH_MSG) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a084f7b2f5..534f7d0505 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -93,6 +93,10 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) #define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) +#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029) + + + diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6b8bf88030..008338d043 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -335,8 +335,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); - int64_t transporterId = 0; - TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); + TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg)); (void)tsem_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; } @@ -392,8 +391,7 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SAppInstInfo* pAppInfo = getAppInfo(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg); if (code) { doRequestCallback(pRequest, code); } @@ -1557,9 +1555,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta return code; } - int64_t transporterId = 0; - code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, - body); + // int64_t transporterId = 0; + code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body); if (TSDB_CODE_SUCCESS != code) { destroyTscObj(*pTscObj); tscError("failed to send connect msg to server, code:%s", tstrerror(code)); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index de56a4844a..59351d66d7 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -16,19 +16,19 @@ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "clientStmt.h" #include "clientMonitor.h" +#include "clientStmt.h" #include "functionMgt.h" #include "os.h" #include "query.h" #include "scheduler.h" +#include "tcompare.h" #include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tcompare.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -120,7 +120,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha } STscObj *pObj = NULL; - int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj); + int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj); if (TSDB_CODE_SUCCESS == code) { int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); if (NULL == rid) { @@ -183,15 +183,15 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) return 0; } -typedef struct SFetchWhiteListInfo{ - int64_t connId; +typedef struct SFetchWhiteListInfo { + int64_t connId; __taos_async_whitelist_fn_t userCbFn; - void* userParam; + void *userParam; } SFetchWhiteListInfo; -int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { - SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param; - TAOS* taos = &pInfo->connId; +int32_t fetchWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) { + SFetchWhiteListInfo *pInfo = (SFetchWhiteListInfo *)param; + TAOS *taos = &pInfo->connId; if (code != TSDB_CODE_SUCCESS) { pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL); taosMemoryFree(pMsg->pData); @@ -209,7 +209,7 @@ int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { return terrno; } - uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t)); + uint64_t *pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t)); if (pWhiteLists == NULL) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -238,7 +238,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa return; } - int64_t connId = *(int64_t*)taos; + int64_t connId = *(int64_t *)taos; STscObj *pTsc = acquireTscObj(connId); if (NULL == pTsc) { @@ -255,7 +255,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa return; } - void* pReq = taosMemoryMalloc(msgLen); + void *pReq = taosMemoryMalloc(msgLen); if (pReq == NULL) { fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL); releaseTscObj(connId); @@ -269,7 +269,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa return; } - SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo)); + SFetchWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo)); if (pParam == NULL) { fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL); taosMemoryFree(pReq); @@ -280,9 +280,9 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa pParam->connId = connId; pParam->userCbFn = fp; pParam->userParam = param; - SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pSendInfo == NULL) { - fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL); + fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL); taosMemoryFree(pParam); taosMemoryFree(pReq); releaseTscObj(connId); @@ -296,9 +296,8 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa pSendInfo->fp = fetchWhiteListCallbackFn; pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST; - int64_t transportId = 0; SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp); - if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) { + if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) { tscWarn("failed to async send msg to server"); } releaseTscObj(connId); @@ -443,7 +442,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - if(pRequest->inCallback) { + if (pRequest->inCallback) { tscError("can not call taos_fetch_row before query callback ends."); terrno = TSDB_CODE_TSC_INVALID_OPERATION; return NULL; @@ -454,7 +453,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo = NULL; if (msg->common.resIter == -1) { - if(tmqGetNextResInfo(res, true, &pResultInfo) != 0){ + if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) { return NULL; } } else { @@ -466,7 +465,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } else { - if (tmqGetNextResInfo(res, true, &pResultInfo) != 0){ + if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) { return NULL; } @@ -540,22 +539,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) len += sprintf(str + len, "%lf", dv); } break; - case TSDB_DATA_TYPE_VARBINARY:{ - void* data = NULL; + case TSDB_DATA_TYPE_VARBINARY: { + void *data = NULL; uint32_t size = 0; - int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); - if(taosAscii2Hex(row[i], charLen, &data, &size) < 0){ + int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); + if (taosAscii2Hex(row[i], charLen, &data, &size) < 0) { break; } (void)memcpy(str + len, data, size); len += size; taosMemoryFree(data); - }break; + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_GEOMETRY: { int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); - if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY || fields[i].type == TSDB_DATA_TYPE_GEOMETRY) { + if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY || + fields[i].type == TSDB_DATA_TYPE_GEOMETRY) { if (charLen > fields[i].bytes || charLen < 0) { tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes); break; @@ -664,7 +664,8 @@ const char *taos_get_client_info() { return version; } // return int32_t int taos_affected_rows(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -675,7 +676,8 @@ int taos_affected_rows(TAOS_RES *res) { // return int64_t int64_t taos_affected_rows64(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -725,7 +727,8 @@ int taos_select_db(TAOS *taos, const char *db) { } void taos_stop_query(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + TD_RES_TMQ_BATCH_META(res)) { return; } @@ -784,7 +787,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return pRequest->code; } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SReqResultInfo *pResultInfo = NULL; - int32_t code = tmqGetNextResInfo(res, true, &pResultInfo); + int32_t code = tmqGetNextResInfo(res, true, &pResultInfo); if (code != 0) return code; pResultInfo->current = pResultInfo->numOfRows; @@ -807,7 +810,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SReqResultInfo *pResultInfo = NULL; - int32_t code = tmqGetNextResInfo(res, false, &pResultInfo); + int32_t code = tmqGetNextResInfo(res, false, &pResultInfo); if (code != 0) { (*numOfRows) = 0; return 0; @@ -953,7 +956,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t (void)memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta)); (void)memset(pResultMeta, 0, sizeof(*pResultMeta)); } - + handleQueryAnslyseRes(pWrapper, pResultMeta, code); } @@ -999,7 +1002,7 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult } pNewRequest->pQuery = NULL; - code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pNewRequest->pQuery); + code = nodesMakeNode(QUERY_NODE_QUERY, (SNode **)&pNewRequest->pQuery); if (pNewRequest->pQuery) { pNewRequest->pQuery->pRoot = pRoot; pRoot = NULL; @@ -1271,7 +1274,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); - (void)refreshMeta(pRequest->pTscObj, pRequest); //ignore return code,try again + (void)refreshMeta(pRequest->pTscObj, pRequest); // ignore return code,try again pRequest->prevCode = code; doAsyncQuery(pRequest, true); return; @@ -1285,7 +1288,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest); - SRequestObj* pUserReq = pRequest; + SRequestObj *pUserReq = pRequest; (void)acquireRequest(pRequest->self); while (pUserReq) { if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) { @@ -1631,7 +1634,6 @@ TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) { return pStmt; } - int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { if (stmt == NULL || sql == NULL) { tscError("NULL parameter for %s", __FUNCTION__); @@ -1874,7 +1876,7 @@ int taos_stmt_close(TAOS_STMT *stmt) { return stmtClose(stmt); } -int taos_set_conn_mode(TAOS* taos, int mode, int value) { +int taos_set_conn_mode(TAOS *taos, int mode, int value) { if (taos == NULL) { terrno = TSDB_CODE_INVALID_PARA; return terrno; @@ -1897,6 +1899,4 @@ int taos_set_conn_mode(TAOS* taos, int mode, int value) { return 0; } -char* getBuildInfo(){ - return buildinfo; -} +char *getBuildInfo() { return buildinfo; } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 4bb29f8d97..d7b169399c 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } MonitorSlowLogData tmp = {.clusterId = p->clusterId, - .type = p->type, - .fileName = p->fileName, - .pFile = p->pFile, - .offset = p->offset, - .data = NULL}; + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } @@ -161,10 +161,9 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; - int64_t transporterId = 0; - return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); + return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo); - FAILED: +FAILED: monitorFreeSlowLogDataEx(param); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); } @@ -279,7 +278,7 @@ void monitorCreateClient(int64_t clusterId) { return; - fail: +fail: destroyMonitorClient(&pMonitor); taosWUnLockLatch(&monitorLock); } @@ -295,7 +294,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); if (newCounter == NULL) return; MonitorClient* pMonitor = *ppMonitor; - if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){ + if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) { tscError("failed to add metric to collector"); (void)taos_counter_destroy(newCounter); goto end; @@ -308,7 +307,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); - end: +end: taosWUnLockLatch(&monitorLock); } @@ -331,13 +330,13 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); goto end; } - if (taos_counter_inc(*ppCounter, label_values) != 0){ + if (taos_counter_inc(*ppCounter, label_values) != 0) { tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName); goto end; } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); - end: +end: taosWUnLockLatch(&monitorLock); } @@ -495,7 +494,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs } static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { - if (fileName == NULL){ + if (fileName == NULL) { return; } int64_t size = getFileSize(*fileName); @@ -504,10 +503,11 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); } else { int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); - if (code == 0){ + if (code == 0) { tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId); - }else{ - tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code); + } else { + tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, + code); } *fileName = NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c9f166e565..9d5a8a111f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -37,7 +37,7 @@ struct SMqMgmt { static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; -static int8_t pollFlag = 0; +static int8_t pollFlag = 0; typedef struct { int32_t code; @@ -121,7 +121,7 @@ struct tmq_t { typedef struct SAskEpInfo { int32_t code; - tsem2_t sem; + tsem2_t sem; } SAskEpInfo; enum { @@ -191,7 +191,7 @@ typedef struct { } SMqPollRspWrapper; typedef struct { - tsem2_t rspSem; + tsem2_t rspSem; int32_t rspErr; } SMqSubscribeCbParam; @@ -219,12 +219,12 @@ typedef struct SMqVgCommon { } SMqVgCommon; typedef struct SMqSeekParam { - tsem2_t sem; + tsem2_t sem; int32_t code; } SMqSeekParam; typedef struct SMqCommittedParam { - tsem2_t sem; + tsem2_t sem; int32_t code; SMqVgOffset vgOffset; } SMqCommittedParam; @@ -242,18 +242,18 @@ typedef struct { int32_t waitingRspNum; int32_t code; tmq_commit_cb* callbackFn; - void* userParam; + void* userParam; } SMqCommitCbParamSet; typedef struct { SMqCommitCbParamSet* params; - char topicName[TSDB_TOPIC_FNAME_LEN]; - int32_t vgId; - int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + int32_t vgId; + int64_t consumerId; } SMqCommitCbParam; typedef struct SSyncCommitInfo { - tsem2_t sem; + tsem2_t sem; int32_t code; } SSyncCommitInfo; @@ -334,7 +334,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcasecmp(key, "session.timeout.ms") == 0) { int64_t tmp = taosStr2int64(value); - if (tmp < 6000 || tmp > 1800000){ + if (tmp < 6000 || tmp > 1800000) { return TMQ_CONF_INVALID; } conf->sessionTimeoutMs = tmp; @@ -343,7 +343,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcasecmp(key, "heartbeat.interval.ms") == 0) { int64_t tmp = taosStr2int64(value); - if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){ + if (tmp < 1000 || tmp >= conf->sessionTimeoutMs) { return TMQ_CONF_INVALID; } conf->heartBeatIntervalMs = tmp; @@ -352,7 +352,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcasecmp(key, "max.poll.interval.ms") == 0) { int64_t tmp = taosStr2int64(value); - if (tmp < 1000 || tmp > INT32_MAX){ + if (tmp < 1000 || tmp > INT32_MAX) { return TMQ_CONF_INVALID; } conf->maxPollIntervalMs = tmp; @@ -515,7 +515,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); - if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) { + if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) { tEncoderClear(&encoder); taosMemoryFree(buf); return TSDB_CODE_INVALID_PARA; @@ -552,9 +552,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; - int64_t transporterId = 0; (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo); if (code != 0) { (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); return code; @@ -562,7 +561,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse return code; } -static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic **topic) { +static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); for (int32_t i = 0; i < numOfTopics; ++i) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -577,8 +576,8 @@ static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic return TSDB_CODE_TMQ_INVALID_TOPIC; } -static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, - int32_t rspNum, SMqCommitCbParamSet** ppParamSet) { +static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum, + SMqCommitCbParamSet** ppParamSet) { SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -595,7 +594,7 @@ static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, voi static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) { SMqClientTopic* pTopic = NULL; - int32_t code = getTopicByName(tmq, pTopicName, &pTopic); + int32_t code = getTopicByName(tmq, pTopicName, &pTopic); if (code != 0) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); return code; @@ -723,7 +722,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us taosRUnLockLatch(&tmq->lock); goto end; } - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { @@ -769,7 +768,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us if (pParamSet->waitingRspNum != 1) { // count down since waiting rsp num init as 1 code = commitRspCountDown(pParamSet, tmq->consumerId, "", 0); - if (code != 0){ + if (code != 0) { tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); pParamSet = NULL; goto end; @@ -824,14 +823,14 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (code != 0){ + if (code != 0) { goto _return; } if (pMsg == NULL || param == NULL) { code = TSDB_CODE_INVALID_PARA; goto _return; } - + SMqHbRsp rsp = {0}; code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); if (code != 0) { @@ -858,7 +857,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { taosWUnLockLatch(&tmq->lock); (void)taosReleaseRef(tmqMgmt.rsetId, refId); } - + tDestroySMqHbRsp(&rsp); _return: @@ -881,32 +880,32 @@ void tmqSendHbReq(void* param, void* tmrId) { req.epoch = tmq->epoch; req.pollFlag = atomic_load_8(&pollFlag); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); - if (req.topics == NULL){ + if (req.topics == NULL) { return; } taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); if (pTopic == NULL) { continue; } int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); TopicOffsetRows* data = taosArrayReserve(req.topics, 1); - if (data == NULL){ + if (data == NULL) { continue; } (void)strcpy(data->topicName, pTopic->topicName); data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); - if (data->offsetRows == NULL){ + if (data->offsetRows == NULL) { continue; } for (int j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg == NULL){ + if (pVg == NULL) { continue; } - OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); - if (offRows == NULL){ + OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); + if (offRows == NULL) { continue; } offRows->vgId = pVg->vgId; @@ -955,8 +954,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } @@ -964,7 +962,7 @@ void tmqSendHbReq(void* param, void* tmrId) { (void)atomic_val_compare_exchange_8(&pollFlag, 1, 0); OVER: tDestroySMqHbReq(&req); - if(tmrId != NULL){ + if (tmrId != NULL) { (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); } (void)taosReleaseRef(tmqMgmt.rsetId, refId); @@ -1006,14 +1004,15 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { continue; } tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); + (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, + &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, - &pTmq->commitTimer); + &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -1100,16 +1099,16 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); - if(topic == NULL) { + if (topic == NULL) { tscError("topic is null"); continue; } char* tmp = strchr(topic->topicName, '.'); - if(tmp == NULL) { + if (tmp == NULL) { tscError("topic name is invalid:%s", topic->topicName); continue; } - if(tmq_list_append(*topics, tmp+ 1) != 0) { + if (tmq_list_append(*topics, tmp + 1) != 0) { tscError("failed to append topic:%s", tmp + 1); continue; } @@ -1227,27 +1226,31 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { } code = taosOpenQueue(&pTmq->mqueue); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + pTmq->groupId); SET_ERROR_MSG_TMQ("open queue failed") goto _failed; } code = taosOpenQueue(&pTmq->delayedTask); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + pTmq->groupId); SET_ERROR_MSG_TMQ("open delayed task queue failed") goto _failed; } code = taosAllocateQall(&pTmq->qall); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + pTmq->groupId); SET_ERROR_MSG_TMQ("allocate qall failed") goto _failed; } if (conf->groupId[0] == 0) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + pTmq->groupId); SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty") goto _failed; } @@ -1287,8 +1290,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // init semaphore if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) { - tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, tstrerror(TAOS_SYSTEM_ERROR(errno)), - pTmq->groupId); + tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, + tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId); SET_ERROR_MSG_TMQ("init t_sem failed") goto _failed; } @@ -1371,7 +1374,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SName name = {0}; code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); if (code) { - tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, code); + tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, + code); goto FAIL; } char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); @@ -1382,7 +1386,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = tNameExtractFullName(&name, topicFName); if (code) { - tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, code); + tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, + code); taosMemoryFree(topicFName); goto FAIL; } @@ -1429,8 +1434,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { goto FAIL; } @@ -1459,7 +1463,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); - tmq->commitTimer =taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + tmq->commitTimer = + taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { code = TSDB_CODE_TSC_INTERNAL_ERROR; goto FAIL; @@ -1516,12 +1521,12 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { } int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { - tmq_t* tmq = NULL; + tmq_t* tmq = NULL; SMqPollRspWrapper* pRspWrapper = NULL; - int8_t rspType = 0; - int32_t vgId = 0; - uint64_t requestId = 0; - SMqPollCbParam* pParam = (SMqPollCbParam*)param; + int8_t rspType = 0; + int32_t vgId = 0; + uint64_t requestId = 0; + SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pMsg == NULL) { return TSDB_CODE_TSC_INTERNAL_ERROR; } @@ -1530,7 +1535,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFreeClear(pMsg->pEpSet); return TSDB_CODE_TSC_INTERNAL_ERROR; } - int64_t refId = pParam->refId; + int64_t refId = pParam->refId; vgId = pParam->vgId; requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); @@ -1621,18 +1626,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } tDecoderClear(&decoder); (void)memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); - tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId,requestId); + tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId, + requestId); } else { // invalid rspType tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } END: - if (pRspWrapper){ + if (pRspWrapper) { pRspWrapper->code = code; pRspWrapper->vgId = vgId; (void)strcpy(pRspWrapper->topicName, pParam->topicName); code = taosWriteQitem(tmq->mqueue, pRspWrapper); - if(code != 0){ + if (code != 0) { tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); } } @@ -1676,7 +1682,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic } for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - if (pVgEp == NULL){ + if (pVgEp == NULL) { continue; } (void)sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId); @@ -1712,7 +1718,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.committedOffset = offsetNew; clientVg.offsetInfo.beginOffset = offsetNew; } - if (taosArrayPush(pTopic->vgs, &clientVg) == NULL){ + if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) { tscError("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId, pTopic->topicName); freeClientVg(&clientVg); @@ -1773,7 +1779,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; - if(taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0){ + if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) { tscError("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId); } } @@ -1787,7 +1793,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) continue; } initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq); - if(taosArrayPush(newTopics, &topic) == NULL){ + if (taosArrayPush(newTopics, &topic) == NULL) { tscError("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName); freeClientTopic(&topic); } @@ -1919,7 +1925,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable pDataRsp->withSchema = true; pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*)); - if (pDataRsp->blockSchema == NULL){ + if (pDataRsp->blockSchema == NULL) { tscError("failed to allocate memory for blockSchema"); return; } @@ -1938,7 +1944,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if (schema) { - if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL){ + if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) { tscError("failed to push schema into blockSchema"); continue; } @@ -1947,7 +1953,8 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg } } -int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj** ppRspObj) { +int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, + SMqRspObj** ppRspObj) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); if (pRspObj == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1959,7 +1966,8 @@ int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, in return 0; } -int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqTaosxRspObj** ppRspObj) { +int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, + SMqTaosxRspObj** ppRspObj) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); if (pRspObj == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2026,10 +2034,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; - int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); - code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if (code != 0) { @@ -2056,10 +2063,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (pTopic == NULL){ + if (pTopic == NULL) { continue; } - int32_t numOfVg = taosArrayGetSize(pTopic->vgs); + int32_t numOfVg = taosArrayGetSize(pTopic->vgs); if (pTopic->noPrivilege) { tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName); continue; @@ -2069,7 +2076,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if (pVg == NULL) { continue; } - int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; + int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); @@ -2220,8 +2227,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pVg->blockSleepForReplay = pRsp->rsp.sleepTime; if (pVg->blockSleepForReplay > 0) { if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) { - tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%"PRId64, tmq->consumerId, - pVg->vgId, pVg->blockSleepForReplay); + tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64, + tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay); } } } @@ -2302,7 +2309,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, tmq->consumerId, true); SMqBatchMetaRspObj* pRsp = NULL; - (void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp) ; + (void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2349,9 +2356,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } // build rsp - int64_t numOfRows = 0; - SMqTaosxRspObj* pRsp = NULL; - if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) !=0 ) { + int64_t numOfRows = 0; + SMqTaosxRspObj* pRsp = NULL; + if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) { tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId); } tmq->totalRows += numOfRows; @@ -2471,7 +2478,7 @@ static void displayConsumeStatistics(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } -static int32_t innerClose(tmq_t* tmq){ +static int32_t innerClose(tmq_t* tmq) { if (tmq->status != TMQ_CONSUMER_STATUS__READY) { tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId); return 0; @@ -2485,7 +2492,7 @@ static int32_t innerClose(tmq_t* tmq){ tmqSendHbReq((void*)(tmq->refId), NULL); tmq_list_t* lst = tmq_list_new(); - if (lst == NULL){ + if (lst == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = tmq_subscribe(tmq, lst); @@ -2499,7 +2506,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t code = 0; if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { code = innerClose(tmq); - if(code == 0){ + if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); } } @@ -2514,12 +2521,12 @@ int32_t tmq_consumer_close(tmq_t* tmq) { int32_t code = 0; if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { code = innerClose(tmq); - if(code == 0){ + if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); } } - if (code == 0){ + if (code == 0) { (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); } return code; @@ -2562,13 +2569,13 @@ const char* tmq_get_topic_name(TAOS_RES* res) { return NULL; } if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { - char *tmp = strchr(((SMqRspObjCommon*)res)->topic, '.'); + char* tmp = strchr(((SMqRspObjCommon*)res)->topic, '.'); if (tmp == NULL) { return NULL; } return tmp + 1; } else if (TD_RES_TMQ_META(res)) { - char *tmp = strchr(((SMqMetaRspObj*)res)->topic, '.'); + char* tmp = strchr(((SMqMetaRspObj*)res)->topic, '.'); if (tmp == NULL) { return NULL; } @@ -2584,13 +2591,13 @@ const char* tmq_get_db_name(TAOS_RES* res) { } if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { - char *tmp = strchr(((SMqRspObjCommon*)res)->db, '.'); + char* tmp = strchr(((SMqRspObjCommon*)res)->db, '.'); if (tmp == NULL) { return NULL; } return tmp + 1; } else if (TD_RES_TMQ_META(res)) { - char *tmp = strchr(((SMqMetaRspObj*)res)->db, '.'); + char* tmp = strchr(((SMqMetaRspObj*)res)->db, '.'); if (tmp == NULL) { return NULL; } @@ -2690,7 +2697,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { int32_t code = 0; SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); - if(pInfo == NULL) { + if (pInfo == NULL) { tscError("failed to allocate memory for sync commit"); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2836,7 +2843,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (param == NULL) { goto FAIL; } - + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { @@ -2857,7 +2864,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (pParam->sync) { SMqAskEpRsp rsp = {0}; - if(tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL){ + if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) { (void)doUpdateLocalEp(tmq, head->epoch, &rsp); } tDeleteSMqAskEpRsp(&rsp); @@ -2871,10 +2878,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->epoch = head->epoch; (void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); - if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL){ + if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); - }else{ + } else { (void)taosWriteQitem(tmq->mqueue, pWrapper); } } @@ -3015,7 +3022,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes if (common->withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter); - if (pSW){ + if (pSW) { TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols)); } } @@ -3032,9 +3039,9 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes pRspObj->resInfo.precision = precision; pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; - int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows, - convertUcs4); - if (code != 0){ + int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, + pRspObj->resInfo.numOfRows, convertUcs4); + if (code != 0) { return code; } *pResInfo = &pRspObj->resInfo; @@ -3062,18 +3069,18 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); code = tDecodeMqDataRsp(&decoder, &rsp); tDecoderClear(&decoder); - if (code != 0){ + if (code != 0) { goto END; } - SMqRspHead* pHead = pMsg->pData; + SMqRspHead* pHead = pMsg->pData; tmq_topic_assignment assignment = {.begin = pHead->walsver, .end = pHead->walever + 1, .currentOffset = rsp.common.rspOffset.version, .vgId = pParam->vgId}; (void)taosThreadMutexLock(&pCommon->mutex); - if(taosArrayPush(pCommon->pList, &assignment) == NULL){ + if (taosArrayPush(pCommon->pList, &assignment) == NULL) { tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId, pParam->vgId, pCommon->pTopicName); code = TSDB_CODE_TSC_INTERNAL_ERROR; @@ -3184,7 +3191,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep taosMemoryFree(sendInfo); return TSDB_CODE_OUT_OF_MEMORY; } - if (tsem2_init(&pParam->sem, 0, 0) != 0){ + if (tsem2_init(&pParam->sem, 0, 0) != 0) { taosMemoryFree(buf); taosMemoryFree(sendInfo); taosMemoryFree(pParam); @@ -3198,8 +3205,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep sendInfo->fp = tmCommittedCb; sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo); if (code != 0) { (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); @@ -3348,7 +3354,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = NULL; - int32_t code = getTopicByName(tmq, tname, &pTopic); + int32_t code = getTopicByName(tmq, tname, &pTopic); if (code != 0) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); goto end; @@ -3358,10 +3364,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg == NULL){ + if (pClientVg == NULL) { continue; } - int32_t type = pClientVg->offsetInfo.beginOffset.type; + int32_t type = pClientVg->offsetInfo.beginOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -3381,7 +3387,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg == NULL){ + if (pClientVg == NULL) { continue; } if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) { @@ -3410,7 +3416,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - if (tsem2_init(&pCommon->rsp, 0, 0) != 0){ + if (tsem2_init(&pCommon->rsp, 0, 0) != 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -3420,7 +3426,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t i = 0; i < (*numOfAssignment); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg == NULL){ + if (pClientVg == NULL) { continue; } SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); @@ -3475,13 +3481,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->fp = tmqGetWalInfoCb; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; - int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo); if (code != 0) { goto end; } @@ -3504,7 +3509,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg == NULL){ + if (pClientVg == NULL) { continue; } if (pClientVg->vgId != p->vgId) { @@ -3631,7 +3636,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ taosMemoryFree(sendInfo); return TSDB_CODE_OUT_OF_MEMORY; } - if (tsem2_init(&pParam->sem, 0, 0) != 0){ + if (tsem2_init(&pParam->sem, 0, 0) != 0) { taosMemoryFree(msg); taosMemoryFree(sendInfo); taosMemoryFree(pParam); @@ -3645,8 +3650,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->fp = tmqSeekCb; sendInfo->msgType = TDMT_VND_TMQ_SEEK; - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index a312dce164..11ad36dec6 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -56,13 +56,13 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } for (int32_t i = 0; i < taskNum; ++i) { - int32_t* taskId = taosArrayGet(cbParam->taskId, i); + int32_t* taskId = taosArrayGet(cbParam->taskId, i); if (NULL == taskId) { ctgError("taosArrayGet %d taskId failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->taskId)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i); + int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i); if (NULL == msgIdx) { ctgError("taosArrayGet %d msgIdx failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->msgIdx)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -114,7 +114,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); - (void)(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode)); // error handled internal + (void)(*gCtgAsyncFps[pTask->type].handleRspFp)( + &tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode)); // error handled internal } CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); @@ -417,12 +418,12 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); } else { - int32_t* taskId = taosArrayGet(cbParam->taskId, 0); + int32_t* taskId = taosArrayGet(cbParam->taskId, 0); if (NULL == taskId) { ctgError("taosArrayGet %d taskId failed, total:%d", 0, (int32_t)taosArrayGetSize(cbParam->taskId)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - + SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); if (NULL == pTask) { ctgError("taosArrayGet %d SCtgTask failed, total:%d", *taskId, (int32_t)taosArrayGetSize(pJob->pTasks)); @@ -445,7 +446,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { ctgError("get task %d SCtgMsgCtx failed, taskType:%d", -1, pTask->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - + pMsgCtx->pBatchs = pBatchs; #endif @@ -526,8 +527,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, pMsgSendInfo->msgInfo.handle = NULL; pMsgSendInfo->msgType = msgType; - int64_t transporterId = 0; - code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo); pMsgSendInfo = NULL; if (code) { ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code)); @@ -558,9 +558,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ctgError("get task %d SCtgMsgCtx failed, taskType:%d", tReq->msgIdx, pTask->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - - SHashObj* pBatchs = pMsgCtx->pBatchs; - SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); + + SHashObj* pBatchs = pMsgCtx->pBatchs; + SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); if (NULL == pBatch) { newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg)); newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t)); @@ -599,7 +599,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName)); - } else if (CTG_TASK_GET_TB_TSMA == pTask->type){ + } else if (CTG_TASK_GET_TB_TSMA == pTask->type) { SCtgTbTSMACtx* pCtx = pTask->taskCtx; SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); @@ -616,10 +616,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbTSMACtx* pCtx = pTask->taskCtx; SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); if (NULL == pFetch) { - ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, (int32_t)taosArrayGetSize(pCtx->pFetches)); + ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, + (int32_t)taosArrayGetSize(pCtx->pFetches)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); if (NULL == pTbReq) { ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -675,7 +676,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName)); - } else if (CTG_TASK_GET_TB_TSMA == pTask->type){ + } else if (CTG_TASK_GET_TB_TSMA == pTask->type) { SCtgTbTSMACtx* pCtx = pTask->taskCtx; SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); @@ -689,22 +690,23 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT pName = ctx->pName; } } else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) { - SCtgTbTSMACtx* pCtx = pTask->taskCtx; - SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); - if (NULL == pFetch) { - ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, (int32_t)taosArrayGetSize(pCtx->pFetches)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); - if (NULL == pTbReq) { - ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); - if (NULL == pName) { - ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + if (NULL == pFetch) { + ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx, + (int32_t)taosArrayGetSize(pCtx->pFetches)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + if (NULL == pTbReq) { + ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); + if (NULL == pName) { + ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } } else { ctgError("invalid vnode msgType %d", msgType); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); @@ -1629,9 +1631,9 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* } int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out, - SCtgTaskReq* tReq, int32_t reqType) { - char* msg = NULL; - int32_t msgLen = 0; + SCtgTaskReq* tReq, int32_t reqType) { + char* msg = NULL; + int32_t msgLen = 0; SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; @@ -1720,7 +1722,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen)); #else - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; (void)tNameGetFullDbName(pTbName, dbFName); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { @@ -1731,7 +1733,8 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen)); + CTG_RET( + ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen)); #endif } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 6f226ecb21..9555fef2ee 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -61,14 +61,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { if (code) { pInserter->submitRes.code = code; } - + if (code == TSDB_CODE_SUCCESS) { pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2)); if (NULL == pInserter->submitRes.pRsp) { pInserter->submitRes.code = terrno; goto _return; } - + SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp); @@ -108,7 +108,7 @@ _return: (void)tsem_post(&pInserter->ready); taosMemoryFree(pMsg->pData); - + return TSDB_CODE_SUCCESS; } @@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->fp = inserterCallback; - int64_t transporterId = 0; - return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo); + return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo); } static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) { @@ -166,7 +165,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int } else { taosMemoryFree(pBuf); } - + return code; } @@ -228,7 +227,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; goto _end; } - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_NCHAR: @@ -327,11 +326,11 @@ _end: tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } - + return terrno; } *ppReq = pReq; - + return TSDB_CODE_SUCCESS; } @@ -458,7 +457,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->explain = pInserterNode->explain; int64_t suid = 0; - int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); + int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, + &inserter->pSchema, &suid); if (code) { terrno = code; goto _return; @@ -480,9 +480,9 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); if (NULL == inserter->pCols) { - goto _return; + goto _return; } - + SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pInserterNode->pCols) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index d8a2331980..7b55aff5ca 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2118,8 +2118,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->requestId = pTaskInfo->id.queryId; - int64_t transporterId = 0; - code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 14118f189b..15f2a7b500 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -997,8 +997,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery pTask->lastMsgType = msgType; } - int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx); pMsgSendInfo = NULL; if (code) { SCH_ERR_JRET(code); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index df7b4f8fcf..078ede9d73 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -114,6 +114,7 @@ typedef struct SExHandle { void* handle; int64_t refId; void* pThrd; + int8_t pThrdIdx; queue q; int8_t inited; SRWLatch latch; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c3808fba58..90b47e1ef8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -96,9 +96,16 @@ typedef struct SCliConn { int32_t seq; int32_t shareCnt; - int8_t registered; + int8_t registered; + int8_t connnected; + SHashObj* pQTable; } SCliConn; +typedef struct { + SCliConn* conn; + void* arg; +} SReqState; + typedef struct SCliReq { SReqCtx* ctx; STransMsg msg; @@ -146,6 +153,8 @@ typedef struct SCliThrd { int32_t (*initCb)(void* arg, SCliReq* pReq, STransMsg* pResp); int32_t (*notifyCb)(void* arg, SCliReq* pReq, STransMsg* pResp); int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp); + + SHashObj* pIdConnTable; } SCliThrd; typedef struct SCliObj { @@ -229,7 +238,6 @@ static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later -static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn, int32_t code); static void cliReleaseUnfinishedMsg(SCliConn* conn); @@ -257,6 +265,9 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); +int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn); +int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); + static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); @@ -479,7 +490,7 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int8_t cliMayRecycleConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - if (transQueueSize(&conn->reqs) == 0) { + if (transQueueSize(&conn->reqs) == 0 && taosHashGetSize(conn->pQTable) == 0) { (void)delConnFromHeapCache(pThrd->connHeapCache, conn); addConnToPool(pThrd->pool, conn); return 1; @@ -499,12 +510,41 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->info.seqNum = htonl(pHead->seqNum); return 0; } +int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) { + int32_t code = 0; + SCliThrd* pThrd = conn->hostThrd; + if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { + int64_t qId = taosHton64(pHead->qid); + code = taosHashRemove(conn->pQTable, &qId, sizeof(qId)); + if (code != 0) { + tDebug("%s conn %p failed to release req %ld from conn", CONN_GET_INST_LABEL(conn), conn, qId); + } + + code = taosHashRemove(pThrd->pIdConnTable, &qId, sizeof(qId)); + if (code != 0) { + tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId); + } + tDebug("%s conn %p release req %ld", CONN_GET_INST_LABEL(conn), conn, qId); + + for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* pReqs = transQueueGet(&conn->reqs, i); + if (pReqs->msg.info.qId == qId) { + transQueueRm(&conn->reqs, i); + destroyReq(pReqs); + i--; + } + } + return 1; + } + return 0; +} void cliHandleResp2(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; cliResetConnTimer(conn); + SCliReq* pReq = NULL; STransMsgHead* pHead = NULL; int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); @@ -522,39 +562,39 @@ void cliHandleResp2(SCliConn* conn) { return; } + if (cliConnMayHandleReleasReq(conn, pHead)) { + if (cliMayRecycleConn(conn)) { + return; + } + + return; + } + int64_t qId = taosHton64(pHead->qid); + pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - SCliReq* pReq = NULL; - int32_t seq = htonl(pHead->seqNum); + int32_t seq = htonl(pHead->seqNum); + code = cliGetReqBySeq(conn, seq, &pReq); if (code != 0) { - if (cliConnRmReleaseReq(conn, pHead)) { - return; - } else { - } tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, tstrerror(code)); // TODO: notify cb - if (cliMayRecycleConn(conn)) { return; } return; } - // TODO handle release req - // if (cliRecvReleaseReq(conn, pHead)) { - // return; - // } - STransMsg resp = {0}; code = cliBuildRespFromCont(pReq, &resp, pHead); + STraceId* trace = &resp.info.traceId; if (code != 0) { - tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); + tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); } else { - tDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d", CONN_GET_INST_LABEL(conn), conn, - TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%ld", CONN_GET_INST_LABEL(conn), conn, + TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); } code = cliNotifyCb(conn, pReq, &resp); @@ -571,122 +611,6 @@ void cliHandleResp2(SCliConn* conn) { (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } -// void cliHandleResp(SCliConn* conn) { -// int32_t code = 0; -// SCliThrd* pThrd = conn->hostThrd; -// STrans* pInst = pThrd->pInst; - -// cliResetConnTimer(conn); - -// STransMsgHead* pHead = NULL; - -// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; -// int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); -// if (msgLen <= 0) { -// taosMemoryFree(pHead); -// tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); -// // TODO: notify cb -// pThrd->notifyExceptCb(pThrd, NULL, NULL); -// return; -// } - -// if (resetBuf == 0) { -// tTrace("%s conn %p not reset read buf", transLabel(pInst), conn); -// } - -// if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { -// tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); -// // TODO: notify cb -// } -// pHead->code = htonl(pHead->code); -// pHead->msgLen = htonl(pHead->msgLen); -// if (cliRecvReleaseReq(conn, pHead)) { -// return; -// } - -// STransMsg transMsg = {0}; -// transMsg.contLen = transContLenFromMsg(pHead->msgLen); -// transMsg.pCont = transContFromHead((char*)pHead); -// transMsg.code = pHead->code; -// transMsg.msgType = pHead->msgType; -// transMsg.info.ahandle = NULL; -// transMsg.info.traceId = pHead->traceId; -// transMsg.info.hasEpSet = pHead->hasEpSet; -// transMsg.info.cliVer = htonl(pHead->compatibilityVer); - -// SCliReq* pReq = NULL; -// SReqCtx* pCtx = NULL; -// if (CONN_NO_PERSIST_BY_APP(conn)) { -// pReq = transQueuePop(&conn->reqs); - -// pCtx = pReq ? pReq->ctx : NULL; -// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; -// tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); -// } else { -// uint64_t ahandle = (uint64_t)pHead->ahandle; -// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); -// if (pReq == NULL) { -// transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); -// tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, -// transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); -// if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { -// transMsg.code = TSDB_CODE_RPC_BROKEN_LINK; -// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); -// tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn, -// transMsg.info.ahandle); -// } -// } else { -// pCtx = pReq->ctx; -// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; -// tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); -// } -// } -// // buf's mem alread translated to transMsg.pCont -// if (!CONN_NO_PERSIST_BY_APP(conn)) { -// transMsg.info.handle = (void*)conn->refId; -// transMsg.info.refId = (int64_t)(void*)conn->refId; -// tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); -// } - -// STraceId* trace = &transMsg.info.traceId; -// tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, -// TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); - -// if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { -// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); -// transFreeMsg(transMsg.pCont); -// return; -// } -// if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { -// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); -// transFreeMsg(transMsg.pCont); -// return; -// } - -// if (pReq == NULL || (pReq && pReq->type != Release)) { -// if (cliNotifyCb(conn, pReq, &transMsg) != 0) { -// return; -// } -// } -// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); -// tDebug("conn %p msg refId: %" PRId64 "", conn, refId); -// destroyReq(pReq); - -// if (cliConnSendSeqMsg(refId, conn)) { -// return; -// } - -// if (cliMaySendCachedMsg(conn) == true) { -// return; -// } - -// if (CONN_NO_PERSIST_BY_APP(conn)) { -// return addConnToPool(pThrd->pool, conn); -// } - -// (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); -// } - void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { // if (transQueueEmpty(&pConn->reqs)) { // if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { @@ -1266,12 +1190,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); - if (pReq->msg.info.handle != 0) { - // SExHandle *p = transAcquireExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); - // TAOS_CHECK_GOTO(specifyConnRef(pConn, false, pReq->msg.info.handle), NULL, _exception); - // } else { - // TAOS_CHECK_GOTO(allocConnRef(pConn, false), NULL, _exception); - } + code = cliMayUpdateState(pThrd, pReq, pConn); transQueuePush(&pConn->reqs, pReq); return cliDoConn(pThrd, pConn); @@ -1316,11 +1235,13 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; - conn->status = ConnNormal; - conn->broken = false; transRefCliHandle(conn); conn->seq = 0; + conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (conn->pQTable == NULL) { + TAOS_CHECK_GOTO(terrno, NULL, _failed); + } TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); TAOS_CHECK_GOTO(cliGetConnTimer(pThrd, conn), &lino, _failed); @@ -1347,6 +1268,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int _failed: if (conn) { taosMemoryFree(conn->stream); + taosHashCleanup(conn->pQTable); (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); taosMemoryFree(conn->dstAddr); @@ -1405,15 +1327,15 @@ static void cliDestroy(uv_handle_t* handle) { (void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetRefMgt(), conn->refId); } + delConnFromHeapCache(pThrd->connHeapCache, conn); taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); cliDestroyConnMsgs(conn, true); - delConnFromHeapCache(pThrd->connHeapCache, conn); - tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); + taosHashCleanup(conn->pQTable); (void)transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); @@ -1457,6 +1379,7 @@ static void cliConnRmReqs(SCliConn* conn) { if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { transQueueRm(&conn->reqs, i); destroyReq(pReq); + i--; } } } @@ -1536,6 +1459,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { } pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->seqNum = htonl(pConn->seq); + pHead->qid = taosHton64(pReq->info.qId); if (pHead->comp == 0) { if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { @@ -1552,8 +1476,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pCliMsg->seq = pConn->seq; STraceId* trace = &pCliMsg->msg.info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq); + tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); } if (j == 0) { taosMemoryFree(wb); @@ -1571,7 +1495,11 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; transQueuePush(&pConn->reqs, pCliMsg); - code = cliSend2(pConn); + if (pConn->connnected) { + code = cliSend2(pConn); + } else { + // do nothing + } return code; } @@ -1734,6 +1662,7 @@ void cliConnCb(uv_connect_t* req, int status) { // } return; } + pConn->connnected = 1; cliConnSetSockInfo(pConn); @@ -2012,40 +1941,74 @@ int32_t cliConnHandleQueryById(SCliReq* pReq) { int64_t queryId = (int64_t)pReq->msg.info.handle; SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId); if (exh->inited == 1) { - } else { } transReleaseExHandle(transGetRefMgt(), queryId); } return 0; } + +int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { + int32_t code = 0; + int64_t qid = pReq->msg.info.qId; + if (qid == 0) { + return TSDB_CODE_RPC_NO_STATE; + } + + SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); + if (pState == NULL) { + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + } else { + *pConn = pState->conn; + } + return code; +} + +int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { + int32_t code = 0; + int64_t qid = pReq->msg.info.qId; + if (qid == 0) { + return TSDB_CODE_RPC_NO_STATE; + } + SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); + if (pState != 0) { + ASSERT(0); + } + SReqState state = {.conn = pConn, .arg = NULL}; + code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); + return code; +} void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { int32_t lino = 0; STransMsg resp = {0}; int32_t code = (pThrd->initCb)(pThrd, pReq, NULL); TAOS_CHECK_GOTO(code, &lino, _exception); - char addr[TSDB_FQDN_LEN + 64] = {0}; - char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); - CONN_CONSTRUCT_HASH_KEY(addr, ip, port); - STrans* pInst = pThrd->pInst; SCliConn* pConn = NULL; + code = cliMayGetHandleState(pThrd, pReq, &pConn); - pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); - if (pConn == NULL) { - code = cliGetOrCreateConn(pThrd, pReq, &pConn); - if (code == TSDB_CODE_RPC_MAX_SESSIONS) { - TAOS_CHECK_GOTO(code, &lino, _exception); - } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - // do nothing, notiy - return; - } else { - ASSERT(code == 0); - addConnToHeapCache(pThrd->connHeapCache, pConn); + if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + char addr[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + CONN_CONSTRUCT_HASH_KEY(addr, ip, port); + + pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + if (pConn == NULL) { + code = cliGetOrCreateConn(pThrd, pReq, &pConn); + if (code == TSDB_CODE_RPC_MAX_SESSIONS) { + TAOS_CHECK_GOTO(code, &lino, _exception); + } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + // do nothing, notiy + return; + } else { + ASSERT(code == 0); + addConnToHeapCache(pThrd->connHeapCache, pConn); + } } } + code = cliMayUpdateState(pThrd, pReq, pConn); code = cliSendReq(pConn, pReq); tTrace("%s conn %p ready", pInst->label, pConn); @@ -2550,6 +2513,11 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + pThrd->pIdConnTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pThrd->connHeapCache == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + pThrd->initCb = initCb; pThrd->notifyCb = notfiyCb; pThrd->notifyExceptCb = notifyExceptCb; @@ -2576,6 +2544,7 @@ _end: taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); taosHashCleanup(pThrd->batchCache); + taosHashCleanup(pThrd->pIdConnTable); taosMemoryFree(pThrd); } @@ -2632,6 +2601,8 @@ static void destroyThrdObj(SCliThrd* pThrd) { } taosHashCleanup(pThrd->connHeapCache); + taosHashCleanup(pThrd->pIdConnTable); + taosMemoryFree(pThrd); } @@ -3111,24 +3082,26 @@ int32_t transReleaseCliHandle(void* handle) { return TSDB_CODE_RPC_BROKEN_LINK; } - STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527}; + STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, + .info.handle = handle, + .info.ahandle = (void*)0x9527, + .info.qId = (int64_t)handle}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pCtx->ahandle = tmsg.info.ahandle; - SCliReq* cmsg = taosMemoryCalloc(1, sizeof(SCliReq)); + if (cmsg == NULL) { taosMemoryFree(pCtx); return TSDB_CODE_OUT_OF_MEMORY; } cmsg->msg = tmsg; cmsg->st = taosGetTimestampUs(); - cmsg->type = Release; + cmsg->type = Normal; cmsg->ctx = pCtx; STraceId* trace = &tmsg.info.traceId; @@ -3188,31 +3161,7 @@ int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception); } - if (handle != 0) { - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); - if (exh != NULL) { - taosWLockLatch(&exh->latch); - if (exh->handle == NULL && exh->inited != 0) { - SCliReq* pCliMsg = NULL; - code = transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg); - if (code != 0) { - taosWUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), handle); - TAOS_CHECK_GOTO(code, NULL, _exception); - } - - QUEUE_PUSH(&exh->q, &pCliMsg->seqq); - taosWUnLockLatch(&exh->latch); - tDebug("msg refId: %" PRId64 "", handle); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); - return 0; - } else { - exh->inited = 1; - taosWUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), handle); - } - } - } + pReq->info.qId = handle; SCliReq* pCliMsg = NULL; TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); @@ -3259,6 +3208,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* } pReq->info.handle = (void*)(*transpointId); + pReq->info.qId = *transpointId; SCliReq* pCliMsg = NULL; TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); @@ -3597,6 +3547,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { tDebug("release conn id %" PRId64 "", transpointId); STransMsg msg = {.info.handle = (void*)transpointId}; + msg.info.qId = transpointId; pCli->msg = msg; code = transAsyncSend(pThrd->asyncPool, &pCli->q); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 85d064b68f..ad080f655a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -71,7 +71,7 @@ typedef struct SSvrMsg { int32_t seqNum; void* arg; FilteFunc func; - + int8_t sent; } SSvrMsg; typedef struct { @@ -423,16 +423,40 @@ static int8_t uvValidConn(SSvrConn* pConn) { } return forbiddenIp; } -static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) { - if (pConn->status == ConnNormal) { - if (pHead->persist == 1) { - pConn->status = ConnAcquire; - transRefSrvHandle(pConn); - tDebug("conn %p acquired by server app", pConn); - } else if (pHead->noResp == 0) { - transRefSrvHandle(pConn); + +static int32_t uvHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { + int32_t code = 0; + STrans* pInst = pConn->pInst; + if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { + int64_t qId = taosHton64(pHead->qid); + void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); + if (p == NULL) { + code = TSDB_CODE_RPC_NO_STATE; + tTrace("conn %p recv release, and releady release by server qid%ld", pConn, qId); + // notify cli already release, cli release resouce + } else { + SSvrRegArg* arg = p; + (pInst->cfp)(pInst->parent, &(arg->msg), NULL); + tTrace("conn %p recv release, notify server app, qid%ld", pConn, qId); + (void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); } + + STransMsg tmsg = {.code = code, + .msgType = pHead->msgType + 1, + .info.qId = qId, + .info.traceId = pHead->traceId, + .info.seqNum = htonl(pHead->seqNum)}; + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Normal; + srvMsg->pConn = pConn; + + transQueuePush(&pConn->srvMsgs, srvMsg); + + uvStartSendRespImpl(srvMsg); + return 1; } + return 0; } static bool uvHandleReq(SSvrConn* pConn) { STrans* pInst = pConn->pInst; @@ -440,8 +464,8 @@ static bool uvHandleReq(SSvrConn* pConn) { STransMsgHead* pHead = NULL; - int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; - int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf); + int8_t resetBuf = 0; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, 0); if (msgLen <= 0) { tError("%s conn %p read invalid packet", transLabel(pInst), pConn); return false; @@ -469,7 +493,7 @@ static bool uvHandleReq(SSvrConn* pConn) { } } - if (uvRecvReleaseReq(pConn, pHead)) { + if (uvHandleReleaseReq(pConn, pHead)) { return true; } @@ -478,14 +502,14 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - transMsg.info.qId = htole64(pHead->qid); + transMsg.info.qId = taosHton64(pHead->qid); if (transMsg.info.qId > 0) { - int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg)); - if (code != 0) { - tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code)); - return false; - } + // int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg)); + // if (code != 0) { + // tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code)); + // return false; + // } } if (pHead->seqNum == 0) { @@ -595,49 +619,21 @@ void uvOnSendCb(uv_write_t* req, int status) { if (conn == NULL) return; if (status == 0) { - tTrace("conn %p data already was written on stream", conn); - if (!transQueueEmpty(&conn->srvMsgs)) { - SSvrMsg* msg = transQueuePop(&conn->srvMsgs); - STraceId* trace = &msg->msg.info.traceId; - tGDebug("conn %p write data out", conn); - - destroySmsg(msg); - // send cached data - if (!transQueueEmpty(&conn->srvMsgs)) { - msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); - if (msg->type == Register && conn->status == ConnAcquire) { - if (conn->regArg.init) { - transFreeMsg(conn->regArg.msg.pCont); - conn->regArg.init = 0; - } - conn->regArg.notifyCount = 0; - conn->regArg.init = 1; - conn->regArg.msg = msg->msg; - if (conn->broken) { - STrans* pInst = conn->pInst; - (pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL); - memset(&conn->regArg, 0, sizeof(conn->regArg)); - } - (void)transQueuePop(&conn->srvMsgs); - taosMemoryFree(msg); - - msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); - if (msg != NULL) { - uvStartSendRespImpl(msg); - } - } else { - uvStartSendRespImpl(msg); - } + for (int32_t i = 0; i < transQueueSize(&conn->srvMsgs); i++) { + SSvrMsg* smsg = transQueueGet(&conn->srvMsgs, i); + if (smsg->sent == 1) { + transQueueRm(&conn->srvMsgs, i); + destroySmsg(smsg); + i--; } } - transUnrefSrvHandle(conn); } else { if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) { tError("conn %p failed to write data, %s", conn, uv_err_name(status)); conn->broken = true; - transUnrefSrvHandle(conn); } } + transUnrefSrvHandle(conn); } static void uvOnPipeWriteCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); @@ -662,7 +658,6 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { if (pMsg->pCont == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); @@ -673,30 +668,18 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer); pHead->version = TRANS_VER; pHead->seqNum = htonl(pMsg->info.seqNum); + pHead->qid = taosHton64(pMsg->info.qId); // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - (void)transQueuePop(&pConn->srvMsgs); - destroySmsg(smsg); - return TSDB_CODE_INVALID_MSG; + ASSERT(0); + // (void)transQueuePop(&pConn->srvMsgs); + // destroySmsg(smsg); + // return TSDB_CODE_INVALID_MSG; } - // if (pConn->status == ConnNormal) { - // pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); - // if (smsg->type == Release) pHead->msgType = 0; - // } else { - // if (smsg->type == Release) { - // pHead->msgType = 0; - // pConn->status = ConnNormal; - // destroyConnRegArg(pConn); - // transUnrefSrvHandle(pConn); - // } else { - // // set up resp msg type - // pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); - // } - // } - pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); + // pHead->msgType = pMsg->msgType; pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead)); @@ -712,22 +695,60 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { } STraceId* trace = &pMsg->info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType), - pConn->dst, pConn->src, len); + tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%ld", transLabel(pInst), pConn, + TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId); wb->base = (char*)pHead; wb->len = len; return 0; } +static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum) { + int32_t count = 0; + + int32_t size = transQueueSize(&pConn->srvMsgs); + uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t)); + if (pWb == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < size; i++) { + SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i); + if (pMsg->sent == 1) { + continue; + } + uv_buf_t wb; + (void)uvPrepareSendData(pMsg, &wb); + pWb[count] = wb; + + pMsg->sent = 1; + count++; + } + + if (count == 0) { + taosMemoryFree(pWb); + return 0; + } + + *bufNum = count; + *ppBuf = pWb; + + return 0; +} static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { + int32_t code = 0; SSvrConn* pConn = smsg->pConn; if (pConn->broken) { return; } - uv_buf_t wb; - if (uvPrepareSendData(smsg, &wb) < 0) { + uv_buf_t* pBuf = NULL; + int32_t bufNum = 0; + code = uvBuildToSendData(pConn, &pBuf, &bufNum); + if (code != 0) { + tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn); + return; + } + if (bufNum == 0) { return; } @@ -741,24 +762,33 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { return; } } - (void)uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); + (void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb); + taosMemoryFree(pBuf); +} +int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) { + SSvrConn* pConn = pMsg->pConn; + if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE) { + int64_t qid = pMsg->msg.info.qId; + SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); + if (p == NULL) { + tError("%s conn %p already release qid %ld", transLabel(pConn->pInst), pConn, qid); + return TSDB_CODE_RPC_NO_STATE; + } else { + transFreeMsg(p->msg.pCont); + taosHashRemove(pConn->pQTable, &qid, sizeof(qid)); + } + } + return 0; } static void uvStartSendResp(SSvrMsg* smsg) { // impl SSvrConn* pConn = smsg->pConn; - if (pConn->broken == true) { - // persist by + if (uvConnMayHandlsReleaseMsg(smsg) == TSDB_CODE_RPC_NO_STATE) { destroySmsg(smsg); - transUnrefSrvHandle(pConn); return; } - if (pConn->status == ConnNormal) { - transUnrefSrvHandle(pConn); - } - if (!transQueuePush(&pConn->srvMsgs, smsg)) { - return; - } + transQueuePush(&pConn->srvMsgs, smsg); uvStartSendRespImpl(smsg); return; } @@ -1199,7 +1229,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId); - pConn->pQTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pConn->pQTable == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } @@ -1540,19 +1570,20 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { taosMemoryFree(msg); } void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { - int32_t code = 0; - SSvrConn* conn = msg->pConn; - if (conn->status == ConnAcquire) { - if (!transQueuePush(&conn->srvMsgs, msg)) { - return; - } - uvStartSendRespImpl(msg); - return; - } else if (conn->status == ConnRelease || conn->status == ConnNormal) { - tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn); - } + ASSERT(0); + // int32_t code = 0; + // SSvrConn* conn = msg->pConn; + // if (conn->status == ConnAcquire) { + // if (!transQueuePush(&conn->srvMsgs, msg)) { + // return; + // } + // uvStartSendRespImpl(msg); + // return; + // } else if (conn->status == ConnRelease || conn->status == ConnNormal) { + // tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn); + // } - destroySmsg(msg); + // destroySmsg(msg); } void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { // send msg to client @@ -1697,6 +1728,7 @@ int32_t transReleaseSrvHandle(void* handle) { int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; + int64_t qId = info->qId; int64_t refId = info->refId; ASYNC_CHECK_HANDLE(info->refIdMgt, refId, exh); @@ -1704,7 +1736,8 @@ int32_t transReleaseSrvHandle(void* handle) { SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; + STransMsg tmsg = { + .msgType = TDMT_SCH_TASK_RELEASE, .code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); if (m == NULL) { @@ -1713,9 +1746,10 @@ int32_t transReleaseSrvHandle(void* handle) { } m->msg = tmsg; - m->type = Release; + m->type = Normal; - tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle); + tDebug("%s conn %p start to %p, qId:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType), + qId); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); (void)transReleaseExHandle(info->refIdMgt, refId); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e03913d0e7..8cec54ee68 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -60,6 +60,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit" TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")