From 85bf677ef4e33e4514e9e7396902557dacc6d504 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 20 Dec 2021 19:44:12 +0800 Subject: [PATCH] TD-10431 process create vnode msg in dnode module --- include/util/tthread.h | 4 +- source/dnode/mgmt/impl/src/dndVnodes.c | 18 ++- source/dnode/mnode/impl/inc/mndDef.h | 11 +- source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 10 +- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 177 ++++++++++++++---------- source/dnode/mnode/impl/src/mndVgroup.c | 6 +- source/dnode/mnode/impl/src/mnode.c | 2 +- source/dnode/mnode/sdb/src/sdbRaw.c | 4 +- source/util/src/tthread.c | 24 ++-- 11 files changed, 154 insertions(+), 105 deletions(-) diff --git a/include/util/tthread.h b/include/util/tthread.h index 0ff267dd1f..7a5fd1f4c8 100644 --- a/include/util/tthread.h +++ b/include/util/tthread.h @@ -24,8 +24,8 @@ extern "C" { #include "tdef.h" // create new thread -pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); -// destory thread +pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param); +// destory thread bool taosDestoryThread(pthread_t* pthread); // thread running return true bool taosThreadRunning(pthread_t* pthread); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index eacec0a7af..cc4fc5b7d7 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -48,7 +48,7 @@ typedef struct { int32_t opened; int32_t failed; int32_t threadIndex; - pthread_t *pThreadId; + pthread_t thread; SDnode *pDnode; SWrapperCfg *pCfgs; } SVnodeThread; @@ -463,6 +463,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); for (int32_t t = 0; t < threadNum; ++t) { threads[t].threadIndex = t; + threads[t].pDnode = pDnode; threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg)); } @@ -478,16 +479,21 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum == 0) continue; - pThread->pThreadId = taosCreateThread(dnodeOpenVnodeFunc, pThread); - if (pThread->pThreadId == NULL) { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) { dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); } + + pthread_attr_destroy(&thAttr); } for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; - taosDestoryThread(pThread->pThreadId); - pThread->pThreadId = NULL; + if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { + pthread_join(pThread->thread, NULL); + } free(pThread->pCfgs); } free(threads); @@ -790,7 +796,7 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { break; } - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 36b6725737..da54920574 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -61,15 +61,14 @@ typedef enum { } EAuthOp; typedef enum { - TRN_STAGE_PREPARE = 1, - TRN_STAGE_EXECUTE = 2, + TRN_STAGE_PREPARE = 0, + TRN_STAGE_EXECUTE = 1, + TRN_STAGE_ROLLBACK = 2, TRN_STAGE_COMMIT = 3, - TRN_STAGE_ROLLBACK = 4, - TRN_STAGE_RETRY = 5, - TRN_STAGE_OVER = 6, + TRN_STAGE_OVER = 4, } ETrnStage; -typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; +typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { DND_STATUS_OFFLINE = 0, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index e9913803bd..ba72d95537 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -70,6 +70,7 @@ typedef struct SMnode { tmr_h timer; char *path; SMnodeCfg cfg; + int64_t checkTime; SSdb *pSdb; SDnode *pDnode; SArray *pSteps; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5b036f5ba2..a8d37ba655 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -25,6 +25,9 @@ extern "C" { typedef struct { SEpSet epSet; int8_t msgType; + int8_t msgSent; + int8_t msgReceived; + int32_t errCode; int32_t contLen; void *pCont; } STransAction; @@ -39,10 +42,13 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); + int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); -char *mndTransStageStr(ETrnStage stage); -char *mndTransPolicyStr(ETrnPolicy policy); +void mndTransHandleActionRsp(SMnodeMsg *pMsg); + +char *mndTransStageStr(ETrnStage stage); +char *mndTransPolicyStr(ETrnPolicy policy); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bde26631bb..108896c121 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -311,7 +311,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.pCont = pMsg; action.contLen = sizeof(SCreateVnodeMsg); - action.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN; + action.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a91827ba0a..847f6bcd8b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -34,7 +34,7 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); -static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray); +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); @@ -336,10 +336,8 @@ char *mndTransStageStr(ETrnStage stage) { return "commit"; case TRN_STAGE_ROLLBACK: return "rollback"; - case TRN_STAGE_RETRY: - return "retry"; case TRN_STAGE_OVER: - return "stop"; + return "over"; default: return "undefined"; } @@ -381,7 +379,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return NULL; } - mDebug("trans:%d, data:%p is created", pTrans->id, pTrans); + mDebug("trans:%d, is created", pTrans->id); return pTrans; } @@ -410,7 +408,7 @@ void mndTransDrop(STrans *pTrans) { mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); - mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); + // mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); tfree(pTrans); } @@ -453,7 +451,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { } static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { - void *ptr = taosArrayPush(pArray, &pAction); + void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -480,7 +478,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -520,7 +518,7 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); if (taosArrayGetSize(pTrans->commitLogs) != 0) { - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -550,7 +548,7 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -583,6 +581,50 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) // todo } +void mndTransHandleActionRsp(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle); + int32_t transId = (int32_t)(sig >> 32); + int32_t action = (int32_t)((sig << 32) >> 32); + + STrans *pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans == NULL) { + mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr()); + goto HANDLE_ACTION_RSP_OVER; + } + + SArray *pArray = NULL; + if (pTrans->stage == TRN_STAGE_EXECUTE) { + pArray = pTrans->redoActions; + } else if (pTrans->stage == TRN_STAGE_ROLLBACK) { + pArray = pTrans->undoActions; + } else { + } + + if (pArray == NULL) { + mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage)); + goto HANDLE_ACTION_RSP_OVER; + } + + int32_t actionNum = taosArrayGetSize(pTrans->redoActions); + if (action < 0 || action > actionNum) { + mError("trans:%d, invalid action:%d", transId, action); + goto HANDLE_ACTION_RSP_OVER; + } + + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction != NULL) { + pAction->msgReceived = 1; + pAction->errCode = pMsg->code; + } + + mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); + mndTransExecute(pMnode, pTrans); + +HANDLE_ACTION_RSP_OVER: + mndReleaseTrans(pMnode, pTrans); +} + static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); @@ -605,7 +647,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute redo logs finished", pTrans->id) + mDebug("trans:%d, execute redo logs finished", pTrans->id) } } @@ -619,7 +661,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute undo logs finished", pTrans->id) + mDebug("trans:%d, execute undo logs finished", pTrans->id) } } @@ -633,47 +675,70 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute commit logs finished", pTrans->id) + mDebug("trans:%d, execute commit logs finished", pTrans->id) } } return code; } -static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { -#if 0 - int32_t arraySize = taosArrayGetSize(pArray); - for (int32_t i = 0; i < arraySize; ++i) { - STransAction *pAction = taosArrayGet(pArray, i); +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + if (numOfActions == 0) return 0; - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen}; + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent) continue; + + int64_t signature = pTrans->id; + signature = (signature << 32); + signature += action; + + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature}; rpcMsg.pCont = rpcMallocCont(pAction->contLen); if (rpcMsg.pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + + pAction->msgSent = 1; + pAction->msgReceived = 0; + pAction->errCode = 0; + + mDebug("trans:%d, action:%d is sent", pTrans->id, action); mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -#else - return 0; -#endif + int32_t numOfReceivedMsgs = 0; + int32_t errorCode = 0; + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent && pAction->msgReceived) { + numOfReceivedMsgs++; + if (pAction->errCode != 0) { + errorCode = pAction->errCode; + } + } + } + + if (numOfReceivedMsgs == numOfActions) { + mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errorCode); + terrno = errorCode; + return errorCode; + } else { + return TSDB_CODE_MND_ACTION_IN_PROGRESS; + } } static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0; - - mTrace("trans:%d, start to execute redo actions", pTrans->id); - return mndTransExecuteActions(pMnode, pTrans->redoActions); + return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); } static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0; - - mTrace("trans:%d, start to execute undo actions", pTrans->id); - return mndTransExecuteActions(pMnode, pTrans->undoActions); + return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { @@ -681,7 +746,7 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->stage = TRN_STAGE_EXECUTE; - mTrace("trans:%d, stage from prepare to execute", pTrans->id); + mDebug("trans:%d, stage from prepare to execute", pTrans->id); } else { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); @@ -695,17 +760,17 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->stage = TRN_STAGE_COMMIT; - mTrace("trans:%d, stage from execute to commit", pTrans->id); + mDebug("trans:%d, stage from execute to commit", pTrans->id); } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code)); + mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); return code; } else { if (pTrans->policy == TRN_POLICY_ROLLBACK) { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr()); + pTrans->stage = TRN_STAGE_EXECUTE; + mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); } } @@ -713,29 +778,16 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_OVER; - mTrace("trans:%d, commit stage finished", pTrans->id); - } else { - if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr()); - } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr()); - } - } - - return code; + mndTransExecuteCommitLogs(pMnode, pTrans); + pTrans->stage = TRN_STAGE_OVER; + return 0; } static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - mTrace("trans:%d, rollbacked", pTrans->id); + mDebug("trans:%d, rollbacked", pTrans->id); } else { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); @@ -744,20 +796,6 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { return code; } -static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_COMMIT; - mTrace("trans:%d, stage from retry to commit", pTrans->id); - } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr()); - } - - return code; -} - static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; @@ -772,7 +810,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { case TRN_STAGE_COMMIT: code = mndTransCommit(pMnode, pTrans); if (code == 0) { - code = mndTransPerformCommitStage(pMnode, pTrans); + mndTransPerformCommitStage(pMnode, pTrans); } break; case TRN_STAGE_ROLLBACK: @@ -781,9 +819,6 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { code = mndTransRollback(pMnode, pTrans); } break; - case TRN_STAGE_RETRY: - code = mndTransPerformRetryStage(pMnode, pTrans); - break; default: mndTransSendRpcRsp(pTrans, 0); return; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1d62f40d10..eeca8c9546 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -311,7 +311,11 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { return 0; } -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 656fe322cd..fb0b95dc4a 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -225,7 +225,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { } return 0; -} +} SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { mDebug("start to open mnode in %s", path); diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index e37559808e..5a0020199f 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { pRaw->sver = sver; pRaw->dataLen = dataLen; - mTrace("raw:%p, is created, len:%d", pRaw, dataLen); + // mTrace("raw:%p, is created, len:%d", pRaw, dataLen); return pRaw; } void sdbFreeRaw(SSdbRaw *pRaw) { - mTrace("raw:%p, is freed", pRaw); + // mTrace("raw:%p, is freed", pRaw); free(pRaw); } diff --git a/source/util/src/tthread.c b/source/util/src/tthread.c index 5ed7fb5aa0..44fce1c882 100644 --- a/source/util/src/tthread.c +++ b/source/util/src/tthread.c @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#include "os.h" #include "tthread.h" +#include "os.h" +#include "taoserror.h" #include "tdef.h" #include "tutil.h" #include "ulog.h" -#include "taoserror.h" // create new thread -pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { - pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); +pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param) { + pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -36,26 +36,24 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { return pthread; } -// destory thread +// destory thread bool taosDestoryThread(pthread_t* pthread) { - if(pthread == NULL) return false; - if(taosThreadRunning(pthread)) { + if (pthread == NULL) return false; + if (taosThreadRunning(pthread)) { pthread_cancel(*pthread); pthread_join(*pthread, NULL); } - + free(pthread); return true; } // thread running return true bool taosThreadRunning(pthread_t* pthread) { - if(pthread == NULL) return false; + if (pthread == NULL) return false; int ret = pthread_kill(*pthread, 0); - if(ret == ESRCH) - return false; - if(ret == EINVAL) - return false; + if (ret == ESRCH) return false; + if (ret == EINVAL) return false; // alive return true; }