|
|
|
@ -72,8 +72,6 @@ typedef struct {
|
|
|
|
|
void * sync;
|
|
|
|
|
void * wal;
|
|
|
|
|
SSyncCfg cfg;
|
|
|
|
|
sem_t sem;
|
|
|
|
|
int32_t code;
|
|
|
|
|
int32_t numOfTables;
|
|
|
|
|
SSdbTable *tableList[SDB_TABLE_MAX];
|
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
|
|
|
|
|
sdbUpdateMnodeRoles();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
FORCE_INLINE
|
|
|
|
|
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|
|
|
|
tsSdbObj.code = code;
|
|
|
|
|
sem_post(&tsSdbObj.sem);
|
|
|
|
|
sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
assert(param);
|
|
|
|
|
SSdbOper * pOper = param;
|
|
|
|
|
SMnodeMsg *pMsg = pOper->pMsg;
|
|
|
|
|
if (code <= 0) pOper->retCode = code;
|
|
|
|
|
|
|
|
|
|
static int32_t sdbForwardToPeer(SWalHead *pHead) {
|
|
|
|
|
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
|
|
|
|
|
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
|
|
|
|
|
if (processedCount <= 1) {
|
|
|
|
|
if (pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
|
|
|
|
|
if (code > 0) {
|
|
|
|
|
sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
|
|
|
|
|
sem_wait(&tsSdbObj.sem);
|
|
|
|
|
return tsSdbObj.code;
|
|
|
|
|
}
|
|
|
|
|
return code;
|
|
|
|
|
if (pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pOper->cb != NULL) {
|
|
|
|
|
pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
|
|
|
|
|
taosFreeQitem(pOper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void sdbUpdateSync() {
|
|
|
|
|
SSyncCfg syncCfg = {0};
|
|
|
|
|
int32_t index = 0;
|
|
|
|
|
int32_t index = 0;
|
|
|
|
|
|
|
|
|
|
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
|
|
|
|
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
|
|
|
@ -298,7 +305,7 @@ void sdbUpdateSync() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
syncCfg.replica = index;
|
|
|
|
|
syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
|
|
|
|
|
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
|
|
|
|
|
|
|
|
|
|
bool hasThisDnode = false;
|
|
|
|
|
for (int32_t i = 0; i < syncCfg.replica; ++i) {
|
|
|
|
@ -325,10 +332,10 @@ void sdbUpdateSync() {
|
|
|
|
|
syncInfo.getWalInfo = sdbGetWalInfo;
|
|
|
|
|
syncInfo.getFileInfo = sdbGetFileInfo;
|
|
|
|
|
syncInfo.writeToCache = sdbWriteToQueue;
|
|
|
|
|
syncInfo.confirmForward = sdbConfirmForward;
|
|
|
|
|
syncInfo.confirmForward = sdbConfirmForward;
|
|
|
|
|
syncInfo.notifyRole = sdbNotifyRole;
|
|
|
|
|
tsSdbObj.cfg = syncCfg;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (tsSdbObj.sync) {
|
|
|
|
|
syncReconfig(tsSdbObj.sync, &syncCfg);
|
|
|
|
|
} else {
|
|
|
|
@ -339,7 +346,6 @@ void sdbUpdateSync() {
|
|
|
|
|
|
|
|
|
|
int32_t sdbInit() {
|
|
|
|
|
pthread_mutex_init(&tsSdbObj.mutex, NULL);
|
|
|
|
|
sem_init(&tsSdbObj.sem, 0, 0);
|
|
|
|
|
|
|
|
|
|
if (sdbInitWriteWorker() != 0) {
|
|
|
|
|
return -1;
|
|
|
|
@ -379,7 +385,6 @@ void sdbCleanUp() {
|
|
|
|
|
tsSdbObj.wal = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sem_destroy(&tsSdbObj.sem);
|
|
|
|
|
pthread_mutex_destroy(&tsSdbObj.mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) {
|
|
|
|
|
assert(pTable != NULL);
|
|
|
|
|
|
|
|
|
|
pthread_mutex_lock(&tsSdbObj.mutex);
|
|
|
|
|
|
|
|
|
|
if (pHead->version == 0) {
|
|
|
|
|
// assign version
|
|
|
|
|
// assign version
|
|
|
|
|
tsSdbObj.version++;
|
|
|
|
|
pHead->version = tsSdbObj.version;
|
|
|
|
|
} else {
|
|
|
|
|
// for data from WAL or forward, version may be smaller
|
|
|
|
|
if (pHead->version <= tsSdbObj.version) {
|
|
|
|
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
|
|
|
|
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
|
|
|
|
|
sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
|
|
|
|
|
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
|
|
|
|
|
}
|
|
|
|
|
sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
|
|
|
|
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
} else if (pHead->version != tsSdbObj.version + 1) {
|
|
|
|
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
|
|
|
|
sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64,
|
|
|
|
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
|
|
|
|
|
tsSdbObj.version);
|
|
|
|
|
sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
|
|
|
|
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
|
|
|
|
return TSDB_CODE_MND_APP_ERROR;
|
|
|
|
|
} else {
|
|
|
|
|
tsSdbObj.version = pHead->version;
|
|
|
|
@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) {
|
|
|
|
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = sdbForwardToPeer(pHead);
|
|
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
|
|
|
|
|
|
|
|
|
// from app, oper is created
|
|
|
|
|
if (pOper != NULL) {
|
|
|
|
|
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s",
|
|
|
|
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
return code;
|
|
|
|
|
// forward to peers
|
|
|
|
|
pOper->processedCount = 0;
|
|
|
|
|
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
|
|
|
|
if (syncCode <= 0) pOper->processedCount = 1;
|
|
|
|
|
|
|
|
|
|
if (syncCode < 0) {
|
|
|
|
|
sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
|
|
|
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
} else if (syncCode > 0) {
|
|
|
|
|
sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
|
|
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
} else {
|
|
|
|
|
sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
|
|
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
}
|
|
|
|
|
return syncCode;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
|
|
|
|
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
|
|
|
|
|
// even it is WAL/FWD, it shall be called to update version in sync
|
|
|
|
|
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
|
|
|
|
|
|
|
|
|
// from wal or forward msg, oper not created, should add into hash
|
|
|
|
|
if (tsSdbObj.sync != NULL) {
|
|
|
|
|
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
|
|
|
|
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
|
|
|
|
} else {
|
|
|
|
|
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
|
|
|
|
|
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (action == SDB_ACTION_INSERT) {
|
|
|
|
|
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
|
|
|
|
code = (*pTable->decodeFp)(&oper);
|
|
|
|
@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|
|
|
|
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
|
|
|
|
|
|
|
|
|
if (pNewOper->pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
|
|
|
|
|
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
|
|
|
|
|
|
|
|
|
if (pNewOper->pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
|
|
|
|
|
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
|
|
|
|
|
|
|
|
|
if (pNewOper->pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) {
|
|
|
|
|
taosGetQitem(tsSdbWriteQall, &type, &item);
|
|
|
|
|
if (type == TAOS_QTYPE_RPC) {
|
|
|
|
|
pOper = (SSdbOper *)item;
|
|
|
|
|
pOper->processedCount = 1;
|
|
|
|
|
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
|
|
|
|
if (pOper->pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
|
|
|
|
|
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
|
|
|
|
|
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
pHead = (SWalHead *)item;
|
|
|
|
|
pOper = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pOper != NULL && pOper->pMsg != NULL) {
|
|
|
|
|
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
|
|
|
|
|
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
|
|
|
|
|
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = sdbWrite(pOper, pHead, type);
|
|
|
|
|
if (pOper) pOper->retCode = code;
|
|
|
|
|
if (pOper && code <= 0) pOper->retCode = code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
walFsync(tsSdbObj.wal);
|
|
|
|
@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) {
|
|
|
|
|
taosResetQitems(tsSdbWriteQall);
|
|
|
|
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
|
|
|
|
taosGetQitem(tsSdbWriteQall, &type, &item);
|
|
|
|
|
|
|
|
|
|
if (type == TAOS_QTYPE_RPC) {
|
|
|
|
|
pOper = (SSdbOper *)item;
|
|
|
|
|
if (pOper != NULL && pOper->cb != NULL) {
|
|
|
|
|
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
|
|
|
|
|
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pOper != NULL && pOper->pMsg != NULL) {
|
|
|
|
|
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
|
|
|
|
|
tstrerror(pOper->retCode));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pOper != NULL) {
|
|
|
|
|
sdbDecRef(pOper->table, pOper->pObj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
|
|
|
|
|
sdbDecRef(pOper->table, pOper->pObj);
|
|
|
|
|
sdbConfirmForward(NULL, pOper, pOper->retCode);
|
|
|
|
|
} else if (type == TAOS_QTYPE_FWD) {
|
|
|
|
|
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
|
|
|
|
|
taosFreeQitem(item);
|
|
|
|
|
} else {
|
|
|
|
|
taosFreeQitem(item);
|
|
|
|
|
}
|
|
|
|
|
taosFreeQitem(item);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|