Merge branch '3.0' into feature/compressData

This commit is contained in:
yihaoDeng 2024-03-30 02:00:24 +00:00
commit 0cce8df6b5
15 changed files with 136 additions and 127 deletions

View File

@ -3047,7 +3047,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
ENCODESQL();
if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1;
tEndEncode(&encoder);
@ -3162,7 +3162,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
ENCODESQL();
if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;

View File

@ -35,6 +35,7 @@ int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup);
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup);
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup);
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,

View File

@ -260,6 +260,14 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
return 0;
}
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0;
}
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
if (pRedoRaw == NULL) return -1;
@ -535,10 +543,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
int32_t vgId = arbGroupDup.vgId;
int64_t nowMs = taosGetTimestampMs();
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader* pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && is sync => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) {
@ -667,9 +675,16 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.version = req.version;
SMnode *pMnode = pReq->info.node;
SMnode *pMnode = pReq->info.node;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
if (!pOldGroup) {
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
return 0;
}
sdbRelease(pMnode->pSdb, pOldGroup);
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
mError("vgId:%d, arb failed to update arbgroup, since %s", req.vgId, terrstr());
mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
ret = -1;
}

View File

@ -1209,6 +1209,25 @@ static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SArbGroup *pArbGroup = NULL;
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
if (pArbGroup->dbUid == pDb->uid) {
if (mndSetDropArbGroupPrepareLogs(pTrans,pArbGroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pArbGroup);
return -1;
}
}
sdbRelease(pSdb, pArbGroup);
}
return 0;
}

View File

@ -1158,14 +1158,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
} else if (createReq.tagVer > 0 || createReq.colVer > 0) {
int32_t tagDelta = createReq.tagVer - pStb->tagVer;
int32_t colDelta = createReq.colVer - pStb->colVer;
int32_t verDelta = tagDelta + colDelta;
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
if (tagDelta <= 0 && colDelta <= 0) {
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
code = 0;
goto _OVER;
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
} else if ((tagDelta == 1 && colDelta == 0) ||
(tagDelta == 0 && colDelta == 1) ||
(pStb->colVer == 1 && createReq.colVer > 1) ||
(pStb->tagVer == 1 && createReq.tagVer > 1)) {
isAlter = true;
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
} else {

View File

@ -2153,41 +2153,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId);
return -1;
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
}
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t));
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
int64_t checkpointId = mndStreamGenChkpId(pMnode);
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
// TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
if (pStream != NULL) { // TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
} else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
// sleep(500ms)
}
// remove this entry
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
}
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock);
{

View File

@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
return -1;
}
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
return 0;
}

View File

@ -730,8 +730,8 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
return &pIter->cv;
}
if (pIter->iColData < pIter->pRow->pBlockData->nColData) {
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv);
if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
++pIter->iColData;
return &pIter->cv;
} else {

View File

@ -879,7 +879,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in
char* outputWKT = NULL;
if (nGeom == 0) {
if (!(*output = strdup(""))) code = TSDB_CODE_OUT_OF_MEMORY;
if (!(*output = taosStrdup(""))) code = TSDB_CODE_OUT_OF_MEMORY;
*nOutput = 0;
return code;
}

View File

@ -648,14 +648,17 @@ SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) {
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
int32_t taskId = pTask->id.taskId;
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
// not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs.
if (ref > 0) {
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
stTrace("s-task:0x%x release task, ref:%d", taskId, ref);
} else if (ref == 0) {
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
stTrace("s-task:0x%x all refs are gone, free it", taskId);
tFreeStreamTask(pTask);
} else if (ref < 0) {
stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
stError("task ref is invalid, ref:%d, 0x%x", ref, taskId);
}
}
@ -824,13 +827,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
return chkpId;
}
static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
void* pKey = NULL;
@ -847,10 +843,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
return -1;
return TSDB_CODE_SUCCESS;
}
tdbTbcMoveToFirst(pCur);
@ -859,20 +856,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
doClear(pKey, pVal, pCur, pRecycleList);
return -1;
break;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder);
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream manually",
vgId, tsDataDir);
return -1;
break;
}
tDecoderClear(&decoder);
@ -892,10 +887,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) {
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask);
return -1;
continue;
}
taosArrayPush(pMeta->pTaskList, &pTask->id);
@ -907,9 +903,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask);
return -1;
continue;
}
if (pTask->info.fillHistory == 0) {
@ -925,10 +922,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
stError("vgId:%d failed to close meta-file cursor", vgId);
taosArrayDestroy(pRecycleList);
return -1;
stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno));
}
if (taosArrayGetSize(pRecycleList) > 0) {
@ -942,8 +938,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {

View File

@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);

View File

@ -866,9 +866,14 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
SyncTerm term = -1;
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
errno = 0;
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1);
if (term < 0 && (errno == ENFILE || errno == EMFILE)) {
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1);
return -1;
}
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);

View File

@ -585,11 +585,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool;
STrans* pTranInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
size_t klen = strlen(key);
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -624,11 +625,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
size_t klen = strlen(key);
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -1471,7 +1473,8 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
}
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
uint32_t addr = 0;
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
size_t len = strlen(fqdn);
uint32_t* v = taosHashGet(cache, fqdn, len);
if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
@ -1480,7 +1483,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
return addr;
}
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
} else {
addr = *v;
}
@ -1490,13 +1493,14 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
// impl later
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
if (addr != 0xffffffff) {
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
size_t len = strlen(fqdn);
uint32_t* v = taosHashGet(cache, fqdn, len);
if (addr != *v) {
char old[64] = {0}, new[64] = {0};
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
}
}
return;
@ -1537,21 +1541,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
// if (rpcDebugFlag & DEBUG_TRACE) {
// if (tmsgIsValid(pMsg->msg.msgType)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count + 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
// }
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
char addr[TSDB_FQDN_LEN + 64] = {0};
@ -1704,9 +1693,8 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
// SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
QUEUE_INIT(&pBatchList->wq);
@ -1730,7 +1718,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
taosHashPut(pThrd->batchCache, key, strlen(key), &pBatchList, sizeof(void*));
taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
@ -1800,21 +1788,6 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
// if (rpcDebugFlag & DEBUG_TRACE) {
// void* pIter = taosHashIterate(pThrd->msgCount, NULL);
// while (pIter != NULL) {
// int* count = pIter;
// size_t len = 0;
// char* key = taosHashGetKey(pIter, &len);
// if (*count != 0) {
// tDebug("key: %s count: %d", key, *count);
// }
// pIter = taosHashIterate(pThrd->msgCount, pIter);
// }
// tDebug("all conn count: %d", pThrd->newConnCount);
// }
int8_t supportBatch = pTransInst->supportBatch;
if (supportBatch == 0) {
cliNoBatchDealReq(&wq, pThrd);
@ -1971,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
if (pMsg == NULL) {
return;
}
if (param != NULL) {
SCliThrd* pThrd = param;
SCliThrd* pThrd = param;
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) {
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
}
destroyCmsg(pMsg);
@ -1984,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
tDebug("destroy Ahandle A");
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("destroy Ahandle B");
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
tDebug("destroy Ahandle C");
transDestroyConnCtx(pMsg->ctx);
transFreeMsg(pMsg->msg.pCont);
@ -2411,20 +2382,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}
// if (rpcDebugFlag & DEBUG_TRACE) {
// if (tmsgIsValid(pResp->msgType - 1)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 0;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count - 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
// }
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pSem) {

View File

@ -19,7 +19,7 @@
#include "tgeosctx.h"
#include "tlog.h"
#define QUEUE_THRESHOLD 1000 * 1000
#define QUEUE_THRESHOLD (1000 * 1000)
typedef void *(*ThreadFp)(void *param);

View File

@ -17,11 +17,6 @@ using namespace std;
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) {
int64_t start = taosGetTimestampUs();
char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);