Merge pull request #29336 from taosdata/enh/TD-33267

Enh/TD-33267
This commit is contained in:
Shengliang Guan 2024-12-27 09:04:06 +08:00 committed by GitHub
commit f96e3dc650
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 83 additions and 185 deletions

View File

@ -28,7 +28,7 @@
extern "C" {
#endif
typedef enum { MATCH, JUMP, SPLIT, RANGE } InstType;
typedef enum { INS_MATCH, INS_JUMP, INS_SPLIT, INS_RANGE } InstType;
typedef struct MatchValue {
#ifdef WINDOWS

View File

@ -159,14 +159,14 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
if (false == sparSetGet(set, i, &ip)) continue;
Inst *inst = taosArrayGet(builder->dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
goto _exception;
}
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
@ -234,11 +234,11 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) {
}
bool succ = sparSetAdd(set, ip, NULL);
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == MATCH || inst->ty == RANGE) {
if (inst->ty == INS_MATCH || inst->ty == INS_RANGE) {
// do nothing
} else if (inst->ty == JUMP) {
} else if (inst->ty == INS_JUMP) {
dfaAdd(dfa, set, inst->jv.step);
} else if (inst->ty == SPLIT) {
} else if (inst->ty == INS_SPLIT) {
dfaAdd(dfa, set, inst->sv.len1);
dfaAdd(dfa, set, inst->sv.len2);
}
@ -253,11 +253,11 @@ bool dfaRun(FstDfa *dfa, FstSparseSet *from, FstSparseSet *to, uint8_t byte) {
if (false == sparSetGet(from, i, &ip)) continue;
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (inst->rv.start <= byte && byte <= inst->rv.end) {
dfaAdd(dfa, to, ip + 1);
}

View File

@ -17,6 +17,7 @@
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
#include "indexFstDfa.h"
class UtilEnv : public ::testing::Test {
protected:
@ -41,6 +42,29 @@ class UtilEnv : public ::testing::Test {
SArray *rslt;
};
class UtilComm : public ::testing::Test {
protected:
virtual void SetUp() {
// src = (SArray *)taosArrayInit(2, sizeof(void *));
// for (int i = 0; i < 3; i++) {
// SArray *m = taosArrayInit(10, sizeof(uint64_t));
// taosArrayPush(src, &m);
// }
// rslt = (SArray *)taosArrayInit(10, sizeof(uint64_t));
}
virtual void TearDown() {
// for (int i = 0; i < taosArrayGetSize(src); i++) {
// SArray *m = (SArray *)taosArrayGetP(src, i);
// taosArrayDestroy(m);
// }
// taosArrayDestroy(src);
}
// SArray *src;
// SArray *rslt;
};
static void clearSourceArray(SArray *p) {
for (int i = 0; i < taosArrayGetSize(p); i++) {
SArray *m = (SArray *)taosArrayGetP(p, i);
@ -369,3 +393,35 @@ TEST_F(UtilEnv, testDictComm) {
EXPECT_EQ(COMMON_INPUTS[v], i);
}
}
TEST_F(UtilComm, testCompress) {
for (int32_t i = 0; i < 6; i++) {
_cache_range_compare cmpFunc = idxGetCompare((RangeType)(i));
//char[32]a = 0, b = 1;
char a[32] = {0};
char b[32] = {1};
for (int32_t j = 0; j < TSDB_DATA_TYPE_MAX; j++) {
cmpFunc(a, b, j);
}
}
}
TEST_F(UtilComm, testfstDfa) {
{
FstDfaBuilder *builder = dfaBuilderCreate(NULL);
ASSERT_TRUE(builder != NULL);
dfaBuilderDestroy(builder);
}
{
SArray *pInst = taosArrayInit(32, sizeof(uint8_t));
for (int32_t i = 0; i < 26; i++) {
uint8_t v = 'a' + i;
taosArrayPush(pInst, &v);
}
FstDfaBuilder *builder = dfaBuilderCreate(pInst);
FstDfa *dfa = dfaBuilderBuild(builder);
dfaBuilderDestroy(builder);
}
}

View File

@ -151,7 +151,6 @@ typedef struct SCliThrd {
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout
STrans* pInst; //
@ -159,8 +158,6 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache;
SCvtAddr* pCvtAddr;
SHashObj* failFastCache;
SHashObj* batchCache;
SHashObj* connHeapCache;
SCliReq* stopMsg;
@ -224,8 +221,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleException(SCliConn* conn);
@ -2124,144 +2119,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
tTrace("cli process batch size:%d", count);
}
}
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
return NULL;
}
queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr);
pList->sending += 1;
pList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq);
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port);
static void destroyBatchList(SCliBatchList* pList);
static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
int32_t code = 0;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
return;
}
pBatchList->batchLenLimit = pInst->shareConnLimit;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
return;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->shareConnLimit += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
return;
}
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
if (pBatchList == NULL) {
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = 1;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 0;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip);
pBatchList->dst = taosStrdup(key);
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
*ppBatchList = pBatchList;
return 0;
}
static void destroyBatchList(SCliBatchList* pList) {
if (pList == NULL) {
return;
}
while (!QUEUE_IS_EMPTY(&pList->wq)) {
queue* h = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pList->ip);
taosMemoryFree(pList->dst);
taosMemoryFree(pList);
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, &pReq->q);
pBatch->wLen += 1;
pBatch->shareConnLimit = pReq->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
pList->len += 1;
*ppBatch = pBatch;
return 0;
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); }
static void cliAsyncCb(uv_async_t* handle) {
@ -2494,10 +2352,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
pThrd->destroyAhandleFp = pInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -2505,11 +2359,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->connHeapCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
@ -2553,10 +2402,7 @@ _end:
transDQDestroy(pThrd->delayQueue, NULL);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->pIdConnTable);
taosArrayDestroy(pThrd->pQIdBuf);
@ -2580,7 +2426,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
@ -2592,24 +2437,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
queue* h = QUEUE_HEAD(&pBatchList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
while (pIter2 != NULL) {
SHeap* heap = (SHeap*)(pIter2);

View File

@ -615,6 +615,21 @@ TEST_F(TransEnv, http) {
#endif
}
#if 1
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
for (int32_t i = 0; i < 1; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
#endif
{
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "error");