diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 9c8c7a519f..d3cd77eb3f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -38,11 +38,14 @@ typedef struct SRpcConnInfo { typedef struct SRpcMsg { tmsg_t msgType; + tmsg_t expectMsgType; void * pCont; int contLen; int32_t code; - void *handle; // rpc handle returned to app - void *ahandle; // app handle set by client + void * handle; // rpc handle returned to app + void * ahandle; // app handle set by client + int noResp; // has response or not(default 0 indicate resp); + } SRpcMsg; typedef struct { diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 87edfb8dde..7b93f7580d 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -107,10 +107,8 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); // TODO: This is the basic params, and should wrap the params to a queryHandle. -int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, - int32_t nMaxResult); - +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, + tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult); // STsdbCfg int tsdbOptionsInit(STsdbCfg *); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index a729288e34..df2d9604d2 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -636,7 +636,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { } // Decode - pCfg = (STSma *)malloc(sizeof(STSma)); + pCfg = (STSma *)calloc(1, sizeof(STSma)); if (pCfg == NULL) { return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index cd8b60385f..7a5c9c5c1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -20,7 +20,7 @@ static const char *TSDB_SMA_DNAME[] = { "tsma", // TSDB_SMA_TYPE_TIME_RANGE "rsma", // TSDB_SMA_TYPE_ROLLUP }; - +#define SMA_CHECK_HASH #undef SMA_PRINT_DEBUG_LOG #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 10 @@ -72,35 +72,43 @@ typedef struct { struct SSmaStat { SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem + T_REF_DECLARE() }; // declaration of static functions -static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); -static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); -// TODO: This is the basic params, and should wrap the params to a queryHandle. -static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, - int32_t nMaxResult); -static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); - +// expired window +static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); -static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); +static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey); +static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); +static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); +// read data +// TODO: This is the basic params, and should wrap the params to a queryHandle. +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, + int32_t nMaxResult); + +// insert data +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]); +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +// implementation static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]); } @@ -192,6 +200,21 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { return NULL; } +static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { + if (pStat == NULL) return 0; + int ref = T_REF_INC(pStat); + tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + return 0; +} + +static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { + if (pStat == NULL) return 0; + + int ref = T_REF_DEC(pStat); + tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + return 0; +} + static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { ASSERT(pSmaStat != NULL); @@ -296,7 +319,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { * @param msg * @return int32_t */ -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { if (!msg || !pTsdb->pMeta) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; @@ -307,27 +330,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { return TSDB_CODE_FAILED; } - SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, smaType); - - // TODO: decode the msg => start - int64_t indexUid = SMA_TEST_INDEX_UID; - // const char * indexName = SMA_TEST_INDEX_NAME; + // TODO: decode the msg from Stream Computing module => start + int64_t indexUid = SMA_TEST_INDEX_UID; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; - int64_t now = taosGetTimestampMs(); + TSKEY skey1 = 1646987196 * 1e3; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - expiredWindows[i] = now + i; + expiredWindows[i] = skey1 + i; } - // TODO: decode the msg <= end + + SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); + tsdbRefSmaStat(pTsdb, pStat); SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state if (pItem == NULL) { // Response to stream computing: OOM - // For query, if the indexName not found, the TSDB should tell query module to query raw TS data. + // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } @@ -337,6 +361,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbUnRefSmaStat(pTsdb, pStat); tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, tstrerror(terrno)); return TSDB_CODE_FAILED; @@ -347,18 +372,14 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { // If error occurs during put smaStatItem, free the resources of pItem taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } } -#if 0 - SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - int size1 = taosHashGetSize(pItem1->expiredWindows); - tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1); -#endif int8_t state = TSDB_SMA_STAT_EXPIRED; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) { + if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) { // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would // tell query module to query raw TS data. // N.B. @@ -368,37 +389,43 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { taosHashCleanup(pItem->expiredWindows); tfree(pItem->pSma); taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } + tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, + expiredWindows[i]); } -#if 0 - SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - int size2 = taosHashGetSize(pItem1->expiredWindows); - tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2); -#endif - + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; } -static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) { +static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) { SSmaStatItem *pItem = NULL; - // TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode? + tsdbRefSmaStat(pTsdb, pStat); + if (pStat && pStat->smaStatItems) { - pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } -#if 0 if (pItem != NULL) { - // TODO: reset time window for the sma data blocks + // pItem resides in hash buffer all the time unless drop sma index + // TODO: multithread protect if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { // error handling + tsdbUnRefSmaStat(pTsdb, pStat); + tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb), + skey, indexUid); + return TSDB_CODE_FAILED; } - } else { // error handling + tsdbUnRefSmaStat(pTsdb, pStat); + tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid); + return TSDB_CODE_FAILED; } -#endif + + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; } @@ -705,7 +732,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // TODO:tsdbEndTSmaCommit(); // Step 3: reset the SSmaStat - tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); tsdbDestroyTSmaWriteH(&tSmaH); return TSDB_CODE_SUCCESS; @@ -751,7 +778,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } @@ -839,7 +866,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { * @return int32_t */ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { if (!pTsdb->pTSmaEnv) { terrno = TSDB_CODE_INVALID_PTR; @@ -847,12 +874,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ return TSDB_CODE_FAILED; } - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); + tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); if (pItem == NULL) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); terrno = TSDB_CODE_TDB_INVALID_ACTION; - tsdbWarn("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid); + tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid); return TSDB_CODE_FAILED; } @@ -866,16 +895,23 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ } #endif -#if 0 - if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) { +#if 1 + if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { // TODO: mark this window as expired. + tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), + querySKey, indexUid); + } else { + tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey, + indexUid); } + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + #endif STSmaReadH tReadH = {0}; tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); tsdbCloseDBF(&tReadH.dFile); - tsdbInitTSmaFile(&tReadH, querySkey); + tsdbInitTSmaFile(&tReadH, querySKey); if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); return TSDB_CODE_FAILED; @@ -883,7 +919,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ char smaKey[SMA_KEY_LEN] = {0}; void *pSmaKey = &smaKey; - tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey); + tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey); tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), @@ -1010,9 +1046,9 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { } int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, - tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) { + tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey, + if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey, nMaxResult)) < 0) { tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 96fd35e700..ae4e9b0a75 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -301,7 +301,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { break; } - char *msg = (char *)calloc(100, 1); + char *msg = (char *)calloc(1, 100); assert(msg != NULL); ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2c5c4810af..31097a591f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } while (0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) +#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) + static void* cliWorkThread(void* arg); -bool cliMayContinueSendMsg(SCliConn* conn) { +bool cliMaySendCachedMsg(SCliConn* conn) { if (taosArrayGetSize(conn->cliMsgs) > 0) { cliSend(conn); return true; @@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) { } destroyCmsg(pMsg); - if (cliMayContinueSendMsg(conn) == true) { + if (cliMaySendCachedMsg(conn) == true) { return; } @@ -441,7 +443,25 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } - +static bool cliHandleNoResp(SCliConn* conn) { + bool res = false; + SArray* msgs = conn->cliMsgs; + if (taosArrayGetSize(msgs) > 0) { + SCliMsg* pMsg = taosArrayGetP(msgs, 0); + if (REQUEST_NO_RESP(&pMsg->msg)) { + taosArrayRemove(msgs, 0); + destroyCmsg(pMsg); + res = true; + } + if (res == true) { + if (cliMaySendCachedMsg(conn) == false) { + SCliThrdObj* thrd = conn->hostThrd; + addConnToPool(thrd->pool, conn); + } + } + } + return res; +} static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; @@ -452,6 +472,10 @@ static void cliSendCb(uv_write_t* req, int status) { cliHandleExcept(pConn); return; } + if (cliHandleNoResp(pConn) == true) { + tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); + return; + } uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } @@ -489,6 +513,7 @@ void cliSend(SCliConn* pConn) { msgLen += sizeof(STransUserMsg); } + pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ec42ab6402..2efdb109aa 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; transMsg.ahandle = NULL; - transMsg.handle = pConn; + transMsg.handle = NULL; transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; - transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), - inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port), transMsg.contLen); + if (pHead->resflag == 0) { + transRefSrvHandle(pConn); + transMsg.handle = pConn; + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port), transMsg.contLen); + } else { + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, + TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + } STrans* pTransInst = (STrans*)p->shandle; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index bdbfb4a0ae..ec89d695a2 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) { tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .noResp = 0}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) { // conn broken // } +TEST_F(TransEnv, cliPersistHandleExcept) { + tr->SetSrvContinueSend(processContinueSend); + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i > 2) { + tr->StopSrv(); + break; + } + } + taosMsleep(2000); + // conn broken + // +} TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken @@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) { // query and conn is broken } TEST_F(TransEnv, noResp) { + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.noResp = 1}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(2000); + // no resp }