diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 353b103792..5ba6c10abf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3047,7 +3047,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { ENCODESQL(); - if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1; tEndEncode(&encoder); @@ -3162,7 +3162,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; ENCODESQL(); - if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mnode/impl/inc/mndArbGroup.h b/source/dnode/mnode/impl/inc/mndArbGroup.h index ed852cf581..fcd11310e7 100644 --- a/source/dnode/mnode/impl/inc/mndArbGroup.h +++ b/source/dnode/mnode/impl/inc/mndArbGroup.h @@ -35,6 +35,7 @@ int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup); +int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup); bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId, diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index e056e698f3..92ab5274e4 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -260,6 +260,14 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) { return 0; } +int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) { + SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + return 0; +} + static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) { SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup); if (pRedoRaw == NULL) return -1; @@ -535,10 +543,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { int32_t vgId = arbGroupDup.vgId; int64_t nowMs = taosGetTimestampMs(); - bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); - bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); - SArbAssignedLeader* pAssignedLeader = &arbGroupDup.assignedLeader; - int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; + bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); + bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); + SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; + int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; // 1. has assigned && is sync => send req if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { @@ -667,9 +675,16 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); newGroup.version = req.version; - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; + SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); + if (!pOldGroup) { + mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); + return 0; + } + sdbRelease(pMnode->pSdb, pOldGroup); + if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to update arbgroup, since %s", req.vgId, terrstr()); + mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr()); ret = -1; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a1f3a24661..527105a7b8 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1209,6 +1209,25 @@ static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SArbGroup *pArbGroup = NULL; + pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); + if (pIter == NULL) break; + + if (pArbGroup->dbUid == pDb->uid) { + if (mndSetDropArbGroupPrepareLogs(pTrans,pArbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pArbGroup); + return -1; + } + } + + sdbRelease(pSdb, pArbGroup); + } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 874c9cf6fe..381d710b3e 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1158,14 +1158,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } else if (createReq.tagVer > 0 || createReq.colVer > 0) { int32_t tagDelta = createReq.tagVer - pStb->tagVer; int32_t colDelta = createReq.colVer - pStb->colVer; - int32_t verDelta = tagDelta + colDelta; mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d", createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); if (tagDelta <= 0 && colDelta <= 0) { mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name); code = 0; goto _OVER; - } else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) { + } else if ((tagDelta == 1 && colDelta == 0) || + (tagDelta == 0 && colDelta == 1) || + (pStb->colVer == 1 && createReq.colVer > 1) || + (pStb->tagVer == 1 && createReq.tagVer > 1)) { isAlter = true; mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); } else { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b522f6fa..6067af199e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2153,41 +2153,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); - return -1; + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } } - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(pList, req.taskId, req.streamId, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks); } int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + if (pStream != NULL) { // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + } else { + // todo: wait for the create stream trans completed, and launch the checkpoint trans + // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + // sleep(500ms) + } // remove this entry taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); } - mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ffebd783ac..ccfc8cc7c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - return -1; - } - + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index eae9120ddd..5e853adf97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -730,8 +730,8 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { return &pIter->cv; } - if (pIter->iColData < pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv); + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); ++pIter->iColData; return &pIter->cv; } else { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 69a90f03ed..a5e27e1910 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -879,7 +879,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in char* outputWKT = NULL; if (nGeom == 0) { - if (!(*output = strdup(""))) code = TSDB_CODE_OUT_OF_MEMORY; + if (!(*output = taosStrdup(""))) code = TSDB_CODE_OUT_OF_MEMORY; *nOutput = 0; return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5f6440c06d..c24763c024 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -648,14 +648,17 @@ SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) { } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { + int32_t taskId = pTask->id.taskId; int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); + + // not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs. if (ref > 0) { - stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + stTrace("s-task:0x%x release task, ref:%d", taskId, ref); } else if (ref == 0) { - stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); + stTrace("s-task:0x%x all refs are gone, free it", taskId); tFreeStreamTask(pTask); } else if (ref < 0) { - stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); + stError("task ref is invalid, ref:%d, 0x%x", ref, taskId); } } @@ -824,13 +827,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } -static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - taosArrayDestroy(pRecycleList); -} - int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; void* pKey = NULL; @@ -847,10 +843,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - return -1; + return TSDB_CODE_SUCCESS; } tdbTbcMoveToFirst(pCur); @@ -859,20 +856,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno)); - doClear(pKey, pVal, pCur, pRecycleList); - return -1; + break; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); - doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "stream manually", vgId, tsDataDir); - return -1; + break; } tDecoderClear(&decoder); @@ -892,10 +887,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); + if (code < 0) { + stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); - return -1; + continue; } taosArrayPush(pMeta->pTaskList, &pTask->id); @@ -907,9 +903,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); + taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); - return -1; + continue; } if (pTask->info.fillHistory == 0) { @@ -925,10 +922,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); + if (tdbTbcClose(pCur) < 0) { - stError("vgId:%d failed to close meta-file cursor", vgId); - taosArrayDestroy(pRecycleList); - return -1; + stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno)); } if (taosArrayGetSize(pRecycleList) > 0) { @@ -942,8 +938,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); + taosArrayDestroy(pRecycleList); - return 0; + return TSDB_CODE_SUCCESS; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4ca784a32f..c7a1a00a46 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; @@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d40fff447f..3543ed574c 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -866,9 +866,14 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn SyncTerm term = -1; SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); + errno = 0; if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1); + if (term < 0 && (errno == ENFILE || errno == EMFILE)) { + sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1); + return -1; + } if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cd5b7188ad..99af5e520e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -585,11 +585,12 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; STrans* pTranInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + size_t klen = strlen(key); + SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -624,11 +625,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + size_t klen = strlen(key); + SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -1471,7 +1473,8 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { } static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { uint32_t addr = 0; - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); + size_t len = strlen(fqdn); + uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { @@ -1480,7 +1483,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) return addr; } - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); } else { addr = *v; } @@ -1490,13 +1493,14 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); + size_t len = strlen(fqdn); + uint32_t* v = taosHashGet(cache, fqdn, len); if (addr != *v) { char old[64] = {0}, new[64] = {0}; tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); } } return; @@ -1537,21 +1541,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - // if (rpcDebugFlag & DEBUG_TRACE) { - // if (tmsgIsValid(pMsg->msg.msgType)) { - // char buf[128] = {0}; - // sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); - // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - // if (NULL == 0) { - // int localCount = 1; - // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - // } else { - // int localCount = *count + 1; - // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - // } - // } - // } - char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; @@ -1704,9 +1693,8 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); + size_t klen = strlen(key); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); QUEUE_INIT(&pBatchList->wq); @@ -1730,7 +1718,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - taosHashPut(pThrd->batchCache, key, strlen(key), &pBatchList, sizeof(void*)); + taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); @@ -1800,21 +1788,6 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - // if (rpcDebugFlag & DEBUG_TRACE) { - // void* pIter = taosHashIterate(pThrd->msgCount, NULL); - // while (pIter != NULL) { - // int* count = pIter; - // size_t len = 0; - // char* key = taosHashGetKey(pIter, &len); - // if (*count != 0) { - // tDebug("key: %s count: %d", key, *count); - // } - - // pIter = taosHashIterate(pThrd->msgCount, pIter); - // } - // tDebug("all conn count: %d", pThrd->newConnCount); - // } - int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { cliNoBatchDealReq(&wq, pThrd); @@ -1971,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { if (pMsg == NULL) { return; } - if (param != NULL) { - SCliThrd* pThrd = param; + + SCliThrd* pThrd = param; + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); } destroyCmsg(pMsg); @@ -1984,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; - tDebug("destroy Ahandle A"); - if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - tDebug("destroy Ahandle B"); + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { pThrd->destroyAhandleFp(pMsg->ctx->ahandle); } - tDebug("destroy Ahandle C"); transDestroyConnCtx(pMsg->ctx); transFreeMsg(pMsg->msg.pCont); @@ -2411,20 +2382,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } - // if (rpcDebugFlag & DEBUG_TRACE) { - // if (tmsgIsValid(pResp->msgType - 1)) { - // char buf[128] = {0}; - // sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - // if (NULL == 0) { - // int localCount = 0; - // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - // } else { - // int localCount = *count - 1; - // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - // } - // } - // } if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 138d4bc1f4..0712010458 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -19,7 +19,7 @@ #include "tgeosctx.h" #include "tlog.h" -#define QUEUE_THRESHOLD 1000 * 1000 +#define QUEUE_THRESHOLD (1000 * 1000) typedef void *(*ThreadFp)(void *param); diff --git a/source/util/test/tbaseCodecTest.cpp b/source/util/test/tbaseCodecTest.cpp index 4c56979885..63bbfcaa68 100644 --- a/source/util/test/tbaseCodecTest.cpp +++ b/source/util/test/tbaseCodecTest.cpp @@ -17,11 +17,6 @@ using namespace std; #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} - static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) { int64_t start = taosGetTimestampUs(); char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);