[TD-860] add processed count for sdb sync
This commit is contained in:
parent
7fc487b8c1
commit
e35d89a4b4
|
@ -53,6 +53,7 @@ typedef struct {
|
||||||
void * rowData;
|
void * rowData;
|
||||||
int32_t rowSize;
|
int32_t rowSize;
|
||||||
int32_t retCode; // for callback in sdb queue
|
int32_t retCode; // for callback in sdb queue
|
||||||
|
int32_t processedCount; // for sync fwd callback
|
||||||
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
|
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
|
||||||
struct SMnodeMsg *pMsg;
|
struct SMnodeMsg *pMsg;
|
||||||
} SSdbOper;
|
} SSdbOper;
|
||||||
|
|
|
@ -247,20 +247,22 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
||||||
assert(param);
|
assert(param);
|
||||||
SSdbOper * pOper = param;
|
SSdbOper * pOper = param;
|
||||||
SMnodeMsg *pMsg = pOper->pMsg;
|
SMnodeMsg *pMsg = pOper->pMsg;
|
||||||
|
if (code <= 0) pOper->retCode = code;
|
||||||
|
|
||||||
if (code > 0) {
|
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
|
||||||
|
if (processedCount <= 1) {
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg);
|
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code));
|
sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOper->cb != NULL) {
|
if (pOper->cb != NULL) {
|
||||||
code = (*pOper->cb)(pMsg, code);
|
code = (*pOper->cb)(pMsg, pOper->retCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
dnodeSendRpcMnodeWriteRsp(pMsg, code);
|
dnodeSendRpcMnodeWriteRsp(pMsg, code);
|
||||||
|
@ -543,31 +545,34 @@ static int sdbWrite(void *param, void *data, int type) {
|
||||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
|
||||||
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
|
||||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
|
||||||
|
|
||||||
if (syncCode < 0) {
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||||
sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
||||||
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
||||||
return syncCode;
|
|
||||||
} else if (syncCode > 0) {
|
|
||||||
sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
||||||
syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
||||||
} else {
|
|
||||||
}
|
|
||||||
|
|
||||||
// from app, oper is created
|
// from app, oper is created
|
||||||
if (pOper != NULL) {
|
if (pOper != NULL) {
|
||||||
sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
|
// forward to peers
|
||||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
||||||
|
if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1);
|
||||||
|
|
||||||
|
if (syncCode < 0) {
|
||||||
|
sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
|
||||||
|
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||||
|
} else if (syncCode > 0) {
|
||||||
|
sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName,
|
||||||
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||||
|
} else {
|
||||||
|
sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName,
|
||||||
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||||
|
}
|
||||||
return syncCode;
|
return syncCode;
|
||||||
} else {
|
|
||||||
sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
||||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
|
||||||
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||||
|
|
||||||
|
// even it is WAL/FWD, it shall be called to update version in sync
|
||||||
|
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
||||||
|
|
||||||
// from wal or forward msg, oper not created, should add into hash
|
// from wal or forward msg, oper not created, should add into hash
|
||||||
if (action == SDB_ACTION_INSERT) {
|
if (action == SDB_ACTION_INSERT) {
|
||||||
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
||||||
|
@ -972,11 +977,6 @@ static void *sdbWorkerFp(void *param) {
|
||||||
|
|
||||||
if (type == TAOS_QTYPE_RPC) {
|
if (type == TAOS_QTYPE_RPC) {
|
||||||
pOper = (SSdbOper *)item;
|
pOper = (SSdbOper *)item;
|
||||||
if (pOper == NULL) {
|
|
||||||
taosFreeQitem(item);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbDecRef(pOper->table, pOper->pObj);
|
sdbDecRef(pOper->table, pOper->pObj);
|
||||||
sdbConfirmForward(NULL, pOper, pOper->retCode);
|
sdbConfirmForward(NULL, pOper, pOper->retCode);
|
||||||
} else if (type == TAOS_QTYPE_FWD) {
|
} else if (type == TAOS_QTYPE_FWD) {
|
||||||
|
|
Loading…
Reference in New Issue