From b03ed5253def8a6f78041f01d77db872549dbdd4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Jul 2020 14:16:01 +0000 Subject: [PATCH 1/7] [TD-860] change sync confirm in sdb --- src/dnode/src/dnodeMWrite.c | 6 +- src/mnode/src/mnodeSdb.c | 112 +++++++++++++++++------------------- 2 files changed, 56 insertions(+), 62 deletions(-) diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 90d857155a..a6aed29e3b 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { taosFreeQitem(pWrite); } -void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { - SMnodeMsg *pWrite = pRaw; +void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { + SMnodeMsg *pWrite = pMsg; if (pWrite == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { @@ -140,6 +140,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { return; } + if (code > 0) return; + SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 761dce6720..cdcb426ba2 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,8 +72,6 @@ typedef struct { void * sync; void * wal; SSyncCfg cfg; - sem_t sem; - int32_t code; int32_t numOfTables; SSdbTable *tableList[SDB_TABLE_MAX]; pthread_mutex_t mutex; @@ -244,27 +242,19 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { sdbUpdateMnodeRoles(); } +FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - tsSdbObj.code = code; - sem_post(&tsSdbObj.sem); - sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); -} + SMnodeMsg *pMsg = param; - static int32_t sdbForwardToPeer(SWalHead *pHead) { - if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; - - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); - if (code > 0) { - sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); - sem_wait(&tsSdbObj.sem); - return tsSdbObj.code; - } - return code; + if (pMsg) { + sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + } + dnodeSendRpcMnodeWriteRsp(pMsg, code); } void sdbUpdateSync() { SSyncCfg syncCfg = {0}; - int32_t index = 0; + int32_t index = 0; SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); for (int32_t i = 0; i < mnodes->nodeNum; ++i) { @@ -298,7 +288,7 @@ void sdbUpdateSync() { } syncCfg.replica = index; - syncCfg.quorum = (syncCfg.replica == 1) ? 1:2; + syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; bool hasThisDnode = false; for (int32_t i = 0; i < syncCfg.replica; ++i) { @@ -325,10 +315,10 @@ void sdbUpdateSync() { syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.writeToCache = sdbWriteToQueue; - syncInfo.confirmForward = sdbConfirmForward; + syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbObj.cfg = syncCfg; - + if (tsSdbObj.sync) { syncReconfig(tsSdbObj.sync, &syncCfg); } else { @@ -339,7 +329,6 @@ void sdbUpdateSync() { int32_t sdbInit() { pthread_mutex_init(&tsSdbObj.mutex, NULL); - sem_init(&tsSdbObj.sem, 0, 0); if (sdbInitWriteWorker() != 0) { return -1; @@ -379,7 +368,6 @@ void sdbCleanUp() { tsSdbObj.wal = NULL; } - sem_destroy(&tsSdbObj.sem); pthread_mutex_destroy(&tsSdbObj.mutex); } @@ -513,24 +501,22 @@ static int sdbWrite(void *param, void *data, int type) { assert(pTable != NULL); pthread_mutex_lock(&tsSdbObj.mutex); + if (pHead->version == 0) { - // assign version + // assign version tsSdbObj.version++; pHead->version = tsSdbObj.version; } else { // for data from WAL or forward, version may be smaller if (pHead->version <= tsSdbObj.version) { pthread_mutex_unlock(&tsSdbObj.mutex); - if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) { - sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); - } + sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); - sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tsSdbObj.version); + sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_MND_APP_ERROR; } else { tsSdbObj.version = pHead->version; @@ -543,27 +529,33 @@ static int sdbWrite(void *param, void *data, int type) { return code; } - code = sdbForwardToPeer(pHead); + + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + void *mhandle = NULL; + if (pOper != NULL) mhandle = pOper->pMsg; + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, mhandle, TAOS_QTYPE_RPC); pthread_mutex_unlock(&tsSdbObj.mutex); + if (syncCode < 0) { + sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + return syncCode; + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else {} + // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tstrerror(code)); - return code; + sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + return syncCode; + } else { + sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } // from wal or forward msg, oper not created, should add into hash - if (tsSdbObj.sync != NULL) { - sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, code); - } else { - sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } - if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; code = (*pTable->decodeFp)(&oper); @@ -944,17 +936,16 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + if (pOper->pMsg != NULL) { + sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", + pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, + sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); + } } else { pHead = (SWalHead *)item; pOper = NULL; } - if (pOper != NULL && pOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", - pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, - sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); - } - int32_t code = sdbWrite(pOper, pHead, type); if (pOper) pOper->retCode = code; } @@ -965,23 +956,24 @@ static void *sdbWorkerFp(void *param) { taosResetQitems(tsSdbWriteQall); for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(tsSdbWriteQall, &type, &item); + if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper != NULL && pOper->cb != NULL) { + if (pOper == NULL) { + taosFreeQitem(item); + continue; + } + + if (pOper->cb != NULL) { sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } - if (pOper != NULL && pOper->pMsg != NULL) { - sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, - tstrerror(pOper->retCode)); - } - - if (pOper != NULL) { - sdbDecRef(pOper->table, pOper->pObj); - } - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + sdbDecRef(pOper->table, pOper->pObj); + } else if (type == TAOS_QTYPE_FWD) { + syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + } else { } taosFreeQitem(item); } From a2e97d8ec29e6bd29087428fcf64baf07ba2384f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 03:36:59 +0000 Subject: [PATCH 2/7] [TD-860] --- src/dnode/src/dnodeMWrite.c | 2 -- src/mnode/src/mnodeSdb.c | 48 ++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index a6aed29e3b..b53c66e00c 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -140,8 +140,6 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { return; } - if (code > 0) return; - SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index cdcb426ba2..70354cf550 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,12 +244,20 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - SMnodeMsg *pMsg = param; + if (code > 0) return; - if (pMsg) { - sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + assert(param); + + SSdbOper * pOper = param; + SMnodeMsg *pMsg = pOper->pMsg; + + if (pOper->cb != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); + pOper->retCode = (*pOper->cb)(pMsg, code); } - dnodeSendRpcMnodeWriteRsp(pMsg, code); + + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + taosFreeQitem(pOper); } void sdbUpdateSync() { @@ -529,7 +537,6 @@ static int sdbWrite(void *param, void *data, int type) { return code; } - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync void *mhandle = NULL; if (pOper != NULL) mhandle = pOper->pMsg; @@ -541,18 +548,19 @@ static int sdbWrite(void *param, void *data, int type) { tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else {} + sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, + syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + } // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else { - sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } // from wal or forward msg, oper not created, should add into hash @@ -619,7 +627,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -669,7 +677,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -719,7 +727,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -964,18 +972,14 @@ static void *sdbWorkerFp(void *param) { continue; } - if (pOper->cb != NULL) { - sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); - pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); - } - - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); sdbDecRef(pOper->table, pOper->pObj); + sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) { syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + taosFreeQitem(item); } else { + taosFreeQitem(item); } - taosFreeQitem(item); } } From 94e50c413ea0f811f484b1badc794ebe62076598 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 03:54:18 +0000 Subject: [PATCH 3/7] [TD-860] --- src/mnode/src/mnodeSdb.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 70354cf550..0f657bdde8 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,19 +244,26 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - if (code > 0) return; - assert(param); - SSdbOper * pOper = param; SMnodeMsg *pMsg = pOper->pMsg; - if (pOper->cb != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); - pOper->retCode = (*pOper->cb)(pMsg, code); + if (code > 0) { + if (pMsg != NULL) { + sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg); + } + return; } - dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + if (pMsg != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + } + + if (pOper->cb != NULL) { + code = (*pOper->cb)(pMsg, code); + } + + dnodeSendRpcMnodeWriteRsp(pMsg, code); taosFreeQitem(pOper); } @@ -538,9 +545,7 @@ static int sdbWrite(void *param, void *data, int type) { } // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - void *mhandle = NULL; - if (pOper != NULL) mhandle = pOper->pMsg; - int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, mhandle, TAOS_QTYPE_RPC); + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); pthread_mutex_unlock(&tsSdbObj.mutex); if (syncCode < 0) { From e35d89a4b47a2a315d463b32e381e0416735e5af Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 09:56:20 +0000 Subject: [PATCH 4/7] [TD-860] add processed count for sdb sync --- src/mnode/inc/mnodeSdb.h | 1 + src/mnode/src/mnodeSdb.c | 54 ++++++++++++++++++++-------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index ca2fffe24c..eec6d45e23 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -53,6 +53,7 @@ typedef struct { void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue + int32_t processedCount; // for sync fwd callback int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); struct SMnodeMsg *pMsg; } SSdbOper; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 0f657bdde8..4aed820958 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -247,20 +247,22 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { assert(param); SSdbOper * pOper = param; SMnodeMsg *pMsg = pOper->pMsg; + if (code <= 0) pOper->retCode = code; - if (code > 0) { + int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); + if (processedCount <= 1) { if (pMsg != NULL) { - sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg); + sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount); } return; } if (pMsg != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); } if (pOper->cb != NULL) { - code = (*pOper->cb)(pMsg, code); + code = (*pOper->cb)(pMsg, pOper->retCode); } dnodeSendRpcMnodeWriteRsp(pMsg, code); @@ -543,31 +545,34 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); return code; } - - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - pthread_mutex_unlock(&tsSdbObj.mutex); - if (syncCode < 0) { - sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - return syncCode; - } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, - syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else { - } + pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + // forward to peers + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1); + + if (syncCode < 0) { + sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } return syncCode; - } else { - sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + + // even it is WAL/FWD, it shall be called to update version in sync + syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + // from wal or forward msg, oper not created, should add into hash if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; @@ -972,11 +977,6 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper == NULL) { - taosFreeQitem(item); - continue; - } - sdbDecRef(pOper->table, pOper->pObj); sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) { From 4f13c53acfc8c10afc84ea00344fdae4e79453fa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 10:36:57 +0000 Subject: [PATCH 5/7] [TD-860] ret code for sdb --- src/mnode/src/mnodeSdb.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 4aed820958..9f46a77483 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -262,10 +262,10 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { } if (pOper->cb != NULL) { - code = (*pOper->cb)(pMsg, pOper->retCode); + pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); } - dnodeSendRpcMnodeWriteRsp(pMsg, code); + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); taosFreeQitem(pOper); } @@ -552,7 +552,7 @@ static int sdbWrite(void *param, void *data, int type) { if (pOper != NULL) { // forward to peers int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1); + if (syncCode > 0) pOper->processedCount = 0; if (syncCode < 0) { sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, @@ -953,6 +953,7 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; + pOper->processedCount = 1; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; if (pOper->pMsg != NULL) { sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", @@ -965,7 +966,7 @@ static void *sdbWorkerFp(void *param) { } int32_t code = sdbWrite(pOper, pHead, type); - if (pOper) pOper->retCode = code; + if (pOper && code <= 0) pOper->retCode = code; } walFsync(tsSdbObj.wal); From 453715c789546e888d5614f4a4658e232ccd2c7f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 11:10:52 +0000 Subject: [PATCH 6/7] [TD-860] definite lost while update dnode --- src/mnode/src/mnodeDnode.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 4707616ba8..804a64be08 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { } static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { - SDnodeObj *pDnode = pOper->pObj; - SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId); - if (pSaved != NULL && pDnode != pSaved) { - memcpy(pSaved, pDnode, pOper->rowSize); - free(pDnode); - mnodeDecDnodeRef(pSaved); + SDnodeObj *pNew = pOper->pObj; + SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); + if (pDnode != NULL && pNew != pDnode) { + memcpy(pDnode, pNew, pOper->rowSize); + free(pNew); } + mnodeDecDnodeRef(pDnode); return TSDB_CODE_SUCCESS; } From 566b286084eec3958f027386d0867ff7b6d26a68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 11:23:44 +0000 Subject: [PATCH 7/7] [TD-860] invalid write in sdb --- src/mnode/src/mnodeSdb.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 9f46a77483..0c2e478a07 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -551,8 +551,9 @@ static int sdbWrite(void *param, void *data, int type) { // from app, oper is created if (pOper != NULL) { // forward to peers + pOper->processedCount = 0; int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - if (syncCode > 0) pOper->processedCount = 0; + if (syncCode <= 0) pOper->processedCount = 1; if (syncCode < 0) { sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,