TD-10431 remove global variables
This commit is contained in:
parent
18c871a9f1
commit
1bf566a701
|
@ -126,14 +126,11 @@ typedef enum {
|
||||||
SDB_MAX = 12
|
SDB_MAX = 12
|
||||||
} ESdbType;
|
} ESdbType;
|
||||||
|
|
||||||
typedef struct SSdbOpt {
|
typedef struct SSdb SSdb;
|
||||||
const char *path;
|
typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
||||||
} SSdbOpt;
|
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||||
|
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj);
|
||||||
typedef int32_t (*SdbInsertFp)(void *pObj);
|
typedef int32_t (*SdbDeployFp)(SSdb*pSdb);
|
||||||
typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj);
|
|
||||||
typedef int32_t (*SdbDeleteFp)(void *pObj);
|
|
||||||
typedef int32_t (*SdbDeployFp)();
|
|
||||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||||
|
|
||||||
|
@ -148,26 +145,23 @@ typedef struct {
|
||||||
SdbDeleteFp deleteFp;
|
SdbDeleteFp deleteFp;
|
||||||
} SSdbTable;
|
} SSdbTable;
|
||||||
|
|
||||||
typedef struct SSdb SSdb;
|
typedef struct SSdbOpt {
|
||||||
|
const char *path;
|
||||||
|
} SSdbOpt;
|
||||||
|
|
||||||
SSdb *sdbOpen(SSdbOpt *pOption);
|
SSdb *sdbOpen(SSdbOpt *pOption);
|
||||||
void sdbClose(SSdb *pSdb);
|
void sdbClose(SSdb *pSdb);
|
||||||
|
int32_t sdbDeploy(SSdb *pSdb);
|
||||||
void sdbSetTable(SSdb *pSdb, SSdbTable table);
|
void sdbSetTable(SSdb *pSdb, SSdbTable table);
|
||||||
|
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw);
|
||||||
|
|
||||||
// int32_t sdbOpen();
|
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey);
|
||||||
// void sdbClose();
|
void sdbRelease(SSdb *pSdb, void *pObj);
|
||||||
int32_t sdbWrite(SSdbRaw *pRaw);
|
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
|
||||||
|
void sdbCancelFetch(SSdb *pSdb, void *pIter);
|
||||||
|
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
int32_t sdbDeploy();
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbUnDeploy();
|
|
||||||
|
|
||||||
void *sdbAcquire(ESdbType sdb, void *pKey);
|
|
||||||
void sdbRelease(void *pObj);
|
|
||||||
void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj);
|
|
||||||
void sdbCancelFetch(void *pIter);
|
|
||||||
int32_t sdbGetSize(ESdbType sdb);
|
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen);
|
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
||||||
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
|
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
|
||||||
|
|
|
@ -82,6 +82,7 @@ typedef struct STrans {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
ETrnStage stage;
|
ETrnStage stage;
|
||||||
ETrnPolicy policy;
|
ETrnPolicy policy;
|
||||||
|
SMnode *pMnode;
|
||||||
void *rpcHandle;
|
void *rpcHandle;
|
||||||
SArray *redoLogs;
|
SArray *redoLogs;
|
||||||
SArray *undoLogs;
|
SArray *undoLogs;
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
int32_t mndInitTrans(SMnode *pMnode);
|
int32_t mndInitTrans(SMnode *pMnode);
|
||||||
void mndCleanupTrans(SMnode *pMnode);
|
void mndCleanupTrans(SMnode *pMnode);
|
||||||
|
|
||||||
STrans *trnCreate(ETrnPolicy policy, void *rpcHandle);
|
STrans *trnCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle);
|
||||||
void trnDrop(STrans *pTrans);
|
void trnDrop(STrans *pTrans);
|
||||||
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
|
@ -34,8 +34,8 @@ int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg);
|
||||||
int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg);
|
int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg);
|
||||||
|
|
||||||
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData));
|
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData));
|
||||||
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code);
|
int32_t trnApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code);
|
||||||
int32_t trnExecute(int32_t tranId);
|
int32_t trnExecute(SSdb *pSdb, int32_t tranId);
|
||||||
|
|
||||||
SSdbRaw *trnActionEncode(STrans *pTrans);
|
SSdbRaw *trnActionEncode(STrans *pTrans);
|
||||||
SSdbRow *trnActionDecode(SSdbRaw *pRaw);
|
SSdbRow *trnActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
|
@ -68,18 +68,18 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; }
|
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
|
||||||
|
|
||||||
static int32_t mnodeAcctActionDelete(SAcctObj *pAcct) { return 0; }
|
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
|
||||||
|
|
||||||
static int32_t mnodeAcctActionUpdate(SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
|
static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
|
||||||
SAcctObj tObj;
|
SAcctObj tObj;
|
||||||
int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj);
|
int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj);
|
||||||
memcpy(pDstAcct, pSrcAcct, len);
|
memcpy(pDstAcct, pSrcAcct, len);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeCreateDefaultAcct() {
|
static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SAcctObj acctObj = {0};
|
SAcctObj acctObj = {0};
|
||||||
|
@ -98,13 +98,13 @@ static int32_t mnodeCreateDefaultAcct() {
|
||||||
if (pRaw == NULL) return -1;
|
if (pRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
return sdbWrite(pRaw);
|
return sdbWrite(pSdb, pRaw);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndInitAcct(SMnode *pMnode) {
|
int32_t mndInitAcct(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_ACCT,
|
SSdbTable table = {.sdbType = SDB_ACCT,
|
||||||
.keyType = SDB_KEY_BINARY,
|
.keyType = SDB_KEY_BINARY,
|
||||||
.deployFp = (SdbDeployFp)mnodeCreateDefaultAcct,
|
.deployFp = mnodeCreateDefaultAcct,
|
||||||
.encodeFp = (SdbEncodeFp)mnodeAcctActionEncode,
|
.encodeFp = (SdbEncodeFp)mnodeAcctActionEncode,
|
||||||
.decodeFp = (SdbDecodeFp)mnodeAcctActionDecode,
|
.decodeFp = (SdbDecodeFp)mnodeAcctActionDecode,
|
||||||
.insertFp = (SdbInsertFp)mnodeAcctActionInsert,
|
.insertFp = (SdbInsertFp)mnodeAcctActionInsert,
|
||||||
|
|
|
@ -21,10 +21,10 @@
|
||||||
int32_t mndInitSync(SMnode *pMnode) { return 0; }
|
int32_t mndInitSync(SMnode *pMnode) { return 0; }
|
||||||
void mndCleanupSync(SMnode *pMnode) {}
|
void mndCleanupSync(SMnode *pMnode) {}
|
||||||
|
|
||||||
int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData) {
|
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, void *pData) {
|
||||||
trnApply(pData, pData, 0);
|
trnApply(pMnode, pData, pData, 0);
|
||||||
free(pData);
|
free(pData);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndIsMaster() { return true; }
|
bool mndIsMaster(SMnode *pMnode) { return true; }
|
|
@ -158,13 +158,13 @@ SSdbRow *trnActionDecode(SSdbRaw *pRaw) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t trnActionInsert(STrans *pTrans) {
|
static int32_t trnActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||||
SArray *pArray = pTrans->redoLogs;
|
SArray *pArray = pTrans->redoLogs;
|
||||||
int32_t arraySize = taosArrayGetSize(pArray);
|
int32_t arraySize = taosArrayGetSize(pArray);
|
||||||
|
|
||||||
for (int32_t index = 0; index < arraySize; ++index) {
|
for (int32_t index = 0; index < arraySize; ++index) {
|
||||||
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
||||||
int32_t code = sdbWrite(pRaw);
|
int32_t code = sdbWrite(pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
||||||
return code;
|
return code;
|
||||||
|
@ -175,13 +175,13 @@ static int32_t trnActionInsert(STrans *pTrans) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t trnActionDelete(STrans *pTrans) {
|
static int32_t trnActionDelete(SSdb *pSdb, STrans *pTrans) {
|
||||||
SArray *pArray = pTrans->redoLogs;
|
SArray *pArray = pTrans->redoLogs;
|
||||||
int32_t arraySize = taosArrayGetSize(pArray);
|
int32_t arraySize = taosArrayGetSize(pArray);
|
||||||
|
|
||||||
for (int32_t index = 0; index < arraySize; ++index) {
|
for (int32_t index = 0; index < arraySize; ++index) {
|
||||||
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
||||||
int32_t code = sdbWrite(pRaw);
|
int32_t code = sdbWrite(pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
||||||
return code;
|
return code;
|
||||||
|
@ -192,14 +192,14 @@ static int32_t trnActionDelete(STrans *pTrans) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t trnActionUpdate(STrans *pTrans, STrans *pDstTrans) {
|
static int32_t trnActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans) {
|
||||||
assert(true);
|
assert(true);
|
||||||
SArray *pArray = pTrans->redoLogs;
|
SArray *pArray = pTrans->redoLogs;
|
||||||
int32_t arraySize = taosArrayGetSize(pArray);
|
int32_t arraySize = taosArrayGetSize(pArray);
|
||||||
|
|
||||||
for (int32_t index = 0; index < arraySize; ++index) {
|
for (int32_t index = 0; index < arraySize; ++index) {
|
||||||
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
SSdbRaw *pRaw = taosArrayGet(pArray, index);
|
||||||
int32_t code = sdbWrite(pRaw);
|
int32_t code = sdbWrite(pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
|
||||||
return code;
|
return code;
|
||||||
|
@ -213,7 +213,7 @@ static int32_t trnActionUpdate(STrans *pTrans, STrans *pDstTrans) {
|
||||||
|
|
||||||
static int32_t trnGenerateTransId() { return 1; }
|
static int32_t trnGenerateTransId() { return 1; }
|
||||||
|
|
||||||
STrans *trnCreate(ETrnPolicy policy, void *rpcHandle) {
|
STrans *trnCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
|
||||||
STrans *pTrans = calloc(1, sizeof(STrans));
|
STrans *pTrans = calloc(1, sizeof(STrans));
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -339,7 +339,7 @@ int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
||||||
|
|
||||||
if (sdbWrite(pRaw) != 0) {
|
if (sdbWrite(pTrans->pMnode->pSdb, pRaw) != 0) {
|
||||||
mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr());
|
mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -359,13 +359,13 @@ static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
|
int32_t trnApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
trnSendRpcRsp(pData, terrno);
|
trnSendRpcRsp(pData, terrno);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sdbWrite(pData) != 0) {
|
if (sdbWrite(pMnode->pSdb, pData) != 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
trnSendRpcRsp(pData, code);
|
trnSendRpcRsp(pData, code);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
@ -375,10 +375,10 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t trnExecuteArray(SArray *pArray) {
|
static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) {
|
||||||
for (int32_t index = 0; index < pArray->size; ++index) {
|
for (int32_t index = 0; index < pArray->size; ++index) {
|
||||||
SSdbRaw *pRaw = taosArrayGetP(pArray, index);
|
SSdbRaw *pRaw = taosArrayGetP(pArray, index);
|
||||||
if (sdbWrite(pRaw) != 0) {
|
if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -386,15 +386,15 @@ static int32_t trnExecuteArray(SArray *pArray) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
|
static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoLogs); }
|
||||||
|
|
||||||
static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
|
static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoLogs); }
|
||||||
|
|
||||||
static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
|
static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->commitLogs); }
|
||||||
|
|
||||||
static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
|
static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoActions); }
|
||||||
|
|
||||||
static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
|
static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoActions); }
|
||||||
|
|
||||||
static int32_t trnPerformPrepareStage(STrans *pTrans) {
|
static int32_t trnPerformPrepareStage(STrans *pTrans) {
|
||||||
if (trnExecuteRedoLogs(pTrans) == 0) {
|
if (trnExecuteRedoLogs(pTrans) == 0) {
|
||||||
|
@ -454,49 +454,49 @@ static int32_t trnPerformRetryStage(STrans *pTrans) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t trnExecute(int32_t tranId) {
|
int32_t trnExecute(SSdb *pSdb, int32_t tranId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
|
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &tranId);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_PREPARE) {
|
if (pTrans->stage == TRN_STAGE_PREPARE) {
|
||||||
if (trnPerformPrepareStage(pTrans) != 0) {
|
if (trnPerformPrepareStage(pTrans) != 0) {
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_EXECUTE) {
|
if (pTrans->stage == TRN_STAGE_EXECUTE) {
|
||||||
if (trnPerformExecuteStage(pTrans) != 0) {
|
if (trnPerformExecuteStage(pTrans) != 0) {
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_COMMIT) {
|
if (pTrans->stage == TRN_STAGE_COMMIT) {
|
||||||
if (trnPerformCommitStage(pTrans) != 0) {
|
if (trnPerformCommitStage(pTrans) != 0) {
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_ROLLBACK) {
|
if (pTrans->stage == TRN_STAGE_ROLLBACK) {
|
||||||
if (trnPerformRollbackStage(pTrans) != 0) {
|
if (trnPerformRollbackStage(pTrans) != 0) {
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_RETRY) {
|
if (pTrans->stage == TRN_STAGE_RETRY) {
|
||||||
if (trnPerformRetryStage(pTrans) != 0) {
|
if (trnPerformRetryStage(pTrans) != 0) {
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -60,14 +60,14 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUserActionInsert(SUserObj *pUser) {
|
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
|
||||||
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (pUser->prohibitDbHash == NULL) {
|
if (pUser->prohibitDbHash == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pUser->pAcct = sdbAcquire(SDB_ACCT, pUser->acct);
|
pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
|
||||||
if (pUser->pAcct == NULL) {
|
if (pUser->pAcct == NULL) {
|
||||||
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
|
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -76,28 +76,28 @@ static int32_t mndUserActionInsert(SUserObj *pUser) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUserActionDelete(SUserObj *pUser) {
|
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
|
||||||
if (pUser->prohibitDbHash) {
|
if (pUser->prohibitDbHash) {
|
||||||
taosHashCleanup(pUser->prohibitDbHash);
|
taosHashCleanup(pUser->prohibitDbHash);
|
||||||
pUser->prohibitDbHash = NULL;
|
pUser->prohibitDbHash = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pUser->acct != NULL) {
|
if (pUser->acct != NULL) {
|
||||||
sdbRelease(pUser->pAcct);
|
sdbRelease(pSdb, pUser->pAcct);
|
||||||
pUser->pAcct = NULL;
|
pUser->pAcct = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) {
|
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) {
|
||||||
SUserObj tObj;
|
SUserObj tObj;
|
||||||
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
|
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
|
||||||
memcpy(pDstUser, pSrcUser, len);
|
memcpy(pDstUser, pSrcUser, len);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDefaultUser(char *acct, char *user, char *pass) {
|
static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pass) {
|
||||||
SUserObj userObj = {0};
|
SUserObj userObj = {0};
|
||||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||||
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
||||||
|
@ -113,22 +113,22 @@ static int32_t mndCreateDefaultUser(char *acct, char *user, char *pass) {
|
||||||
if (pRaw == NULL) return -1;
|
if (pRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
return sdbWrite(pRaw);
|
return sdbWrite(pSdb, pRaw);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDefaultUsers() {
|
static int32_t mndCreateDefaultUsers(SSdb *pSdb) {
|
||||||
if (mndCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
|
if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
|
if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
|
static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
|
||||||
SUserObj userObj = {0};
|
SUserObj userObj = {0};
|
||||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||||
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
||||||
|
@ -137,7 +137,7 @@ static int32_t mndCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg
|
||||||
userObj.updateTime = userObj.createdTime;
|
userObj.updateTime = userObj.createdTime;
|
||||||
userObj.rootAuth = 0;
|
userObj.rootAuth = 0;
|
||||||
|
|
||||||
STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
STrans *pTrans = trnCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
if (pTrans == NULL) return -1;
|
if (pTrans == NULL) return -1;
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||||
|
@ -188,23 +188,23 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserObj *pUser = sdbAcquire(SDB_USER, pCreate->user);
|
SUserObj *pUser = sdbAcquire(pMnode->pSdb, SDB_USER, pCreate->user);
|
||||||
if (pUser != NULL) {
|
if (pUser != NULL) {
|
||||||
sdbRelease(pUser);
|
sdbRelease(pMnode->pSdb, pUser);
|
||||||
terrno = TSDB_CODE_MND_USER_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_USER_ALREADY_EXIST;
|
||||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->conn.user);
|
SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->conn.user);
|
||||||
if (pOperUser == NULL) {
|
if (pOperUser == NULL) {
|
||||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg);
|
int32_t code = mndCreateUser(pMnode, pOperUser->acct, pCreate->user, pCreate->pass, pMsg);
|
||||||
sdbRelease(pOperUser);
|
sdbRelease(pMnode->pSdb, pOperUser);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||||
|
|
|
@ -278,7 +278,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||||
|
|
||||||
void mndDestroy(const char *path) {
|
void mndDestroy(const char *path) {
|
||||||
mDebug("mnode in %s will be destroyed", path);
|
mDebug("mnode in %s will be destroyed", path);
|
||||||
sdbUnDeploy();
|
taosRemoveDir(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
||||||
|
@ -308,7 +308,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
||||||
if (pMsg->pUser != NULL) {
|
if (pMsg->pUser != NULL) {
|
||||||
sdbRelease(pMsg->pUser);
|
sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
|
|
@ -69,9 +69,9 @@ typedef struct SSdb {
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
extern SSdb tsSdb;
|
int32_t sdbReadFile(SSdb *pSdb);
|
||||||
|
int32_t sdbWriteFile(SSdb *pSdb);
|
||||||
int32_t sdbWriteImp(SSdbRaw *pRaw);
|
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,27 +15,28 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "sdbInt.h"
|
#include "sdbInt.h"
|
||||||
#include "tglobal.h"
|
|
||||||
|
|
||||||
SSdb tsSdb = {0};
|
|
||||||
|
|
||||||
SSdb *sdbOpen(SSdbOpt *pOption) {
|
SSdb *sdbOpen(SSdbOpt *pOption) {
|
||||||
|
mDebug("start to open sdb in %s", pOption->path);
|
||||||
|
|
||||||
SSdb *pSdb = calloc(1, sizeof(SSdb));
|
SSdb *pSdb = calloc(1, sizeof(SSdb));
|
||||||
if (pSdb == NULL) {
|
if (pSdb == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mError("failed to open sdb since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char path[PATH_MAX + 100];
|
char path[PATH_MAX + 100];
|
||||||
snprintf(path, PATH_MAX + 100, "%s%scur%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
|
snprintf(path, PATH_MAX + 100, "%s%scur", pOption->path, TD_DIRSEP);
|
||||||
pSdb->currDir = strdup(path);
|
pSdb->currDir = strdup(path);
|
||||||
snprintf(path, PATH_MAX + 100, "%s%ssync%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
|
snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP);
|
||||||
pSdb->syncDir = strdup(path);
|
pSdb->syncDir = strdup(path);
|
||||||
snprintf(path, PATH_MAX + 100, "%s%stmp%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
|
snprintf(path, PATH_MAX + 100, "%s%stmp", pOption->path, TD_DIRSEP);
|
||||||
pSdb->tmpDir = strdup(path);
|
pSdb->tmpDir = strdup(path);
|
||||||
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
|
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
|
||||||
sdbClose(pSdb);
|
sdbClose(pSdb);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mError("failed to open sdb since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +54,7 @@ SSdb *sdbOpen(SSdbOpt *pOption) {
|
||||||
if (hash == NULL) {
|
if (hash == NULL) {
|
||||||
sdbClose(pSdb);
|
sdbClose(pSdb);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mError("failed to open sdb since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,10 +62,27 @@ SSdb *sdbOpen(SSdbOpt *pOption) {
|
||||||
taosInitRWLatch(&pSdb->locks[i]);
|
taosInitRWLatch(&pSdb->locks[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
int32_t code = sdbReadFile(pSdb);
|
||||||
|
if (code != 0) {
|
||||||
|
sdbClose(pSdb);
|
||||||
|
terrno = code;
|
||||||
|
mError("failed to open sdb since %s", terrstr());
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("sdb open successfully");
|
||||||
|
return pSdb;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbClose(SSdb *pSdb) {
|
void sdbClose(SSdb *pSdb) {
|
||||||
|
mDebug("start to close sdb");
|
||||||
|
|
||||||
|
if (pSdb->curVer != pSdb->lastCommitVer) {
|
||||||
|
mDebug("start to write sdb file since curVer:% " PRId64 " and lastCommitVer:%" PRId64 " inequal", pSdb->curVer,
|
||||||
|
pSdb->lastCommitVer);
|
||||||
|
sdbWriteFile(pSdb);
|
||||||
|
}
|
||||||
|
|
||||||
if (pSdb->currDir != NULL) {
|
if (pSdb->currDir != NULL) {
|
||||||
tfree(pSdb->currDir);
|
tfree(pSdb->currDir);
|
||||||
}
|
}
|
||||||
|
@ -79,10 +98,13 @@ void sdbClose(SSdb *pSdb) {
|
||||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||||
SHashObj *hash = pSdb->hashObjs[i];
|
SHashObj *hash = pSdb->hashObjs[i];
|
||||||
if (hash != NULL) {
|
if (hash != NULL) {
|
||||||
|
taosHashClear(hash);
|
||||||
taosHashCleanup(hash);
|
taosHashCleanup(hash);
|
||||||
}
|
}
|
||||||
pSdb->hashObjs[i] = NULL;
|
pSdb->hashObjs[i] = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mDebug("sdb is closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
void sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
|
@ -94,4 +116,6 @@ void sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
pSdb->deployFps[sdb] = table.deployFp;
|
pSdb->deployFps[sdb] = table.deployFp;
|
||||||
pSdb->encodeFps[sdb] = table.encodeFp;
|
pSdb->encodeFps[sdb] = table.encodeFp;
|
||||||
pSdb->decodeFps[sdb] = table.decodeFp;
|
pSdb->decodeFps[sdb] = table.decodeFp;
|
||||||
|
|
||||||
|
mDebug("set sdb handle of table %d", pSdb, table);
|
||||||
}
|
}
|
|
@ -15,71 +15,75 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "sdbInt.h"
|
#include "sdbInt.h"
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
|
||||||
static int32_t sdbCreateDir() {
|
static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
mDebug("start to create mnode at %s", tsMnodeDir);
|
int32_t code = taosMkDir(pSdb->currDir);
|
||||||
|
if (code != 0) {
|
||||||
if (taosMkDir(tsSdb.currDir) != 0) {
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
mError("failed to create dir:%s since %s", pSdb->currDir, tstrerror(code));
|
||||||
mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr());
|
return code;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosMkDir(tsSdb.syncDir) != 0) {
|
code = taosMkDir(pSdb->syncDir);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
if (code != 0) {
|
||||||
mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr());
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
mError("failed to create dir:%s since %s", pSdb->syncDir, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosMkDir(tsSdb.tmpDir) != 0) {
|
code = taosMkDir(pSdb->tmpDir);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
if (code != 0) {
|
||||||
mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr());
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
mError("failed to create dir:%s since %s", pSdb->tmpDir, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbRunDeployFp() {
|
static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
||||||
mDebug("start to run deploy functions");
|
mDebug("start to run sdb deploy functions");
|
||||||
|
|
||||||
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
|
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
|
||||||
SdbDeployFp fp = tsSdb.deployFps[i];
|
SdbDeployFp fp = pSdb->deployFps[i];
|
||||||
if (fp == NULL) continue;
|
if (fp == NULL) continue;
|
||||||
if ((*fp)() != 0) {
|
|
||||||
mError("failed to deploy sdb:%d since %s", i, terrstr());
|
int32_t code = (*fp)(pSdb);
|
||||||
return -1;
|
if (code != 0) {
|
||||||
|
mError("failed to deploy sdb:%d since %s", i, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("end of run deploy functions");
|
mDebug("sdb deploy functions run finished");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbReadDataFile() {
|
int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
|
|
||||||
if (pRaw == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
char file[PATH_MAX] = {0};
|
|
||||||
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
|
|
||||||
FileFd fd = taosOpenFileRead(file);
|
|
||||||
if (fd <= 0) {
|
|
||||||
free(pRaw);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
mError("failed to open file:%s for read since %s", file, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t readLen = 0;
|
int32_t readLen = 0;
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
|
||||||
|
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
|
||||||
|
if (pRaw == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mError("failed read file since %s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
char file[PATH_MAX] = {0};
|
||||||
|
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
||||||
|
FileFd fd = taosOpenFileRead(file);
|
||||||
|
if (fd <= 0) {
|
||||||
|
free(pRaw);
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
readLen = sizeof(SSdbRaw);
|
readLen = sizeof(SSdbRaw);
|
||||||
ret = taosReadFile(fd, pRaw, readLen);
|
ret = taosReadFile(fd, pRaw, readLen);
|
||||||
|
@ -118,7 +122,7 @@ static int32_t sdbReadDataFile() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = sdbWriteImp(pRaw);
|
code = sdbWriteRaw(pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to read file:%s since %s", file, terrstr());
|
mError("failed to read file:%s since %s", file, terrstr());
|
||||||
goto PARSE_SDB_DATA_ERROR;
|
goto PARSE_SDB_DATA_ERROR;
|
||||||
|
@ -130,29 +134,31 @@ static int32_t sdbReadDataFile() {
|
||||||
PARSE_SDB_DATA_ERROR:
|
PARSE_SDB_DATA_ERROR:
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
terrno = code;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbWriteDataFile() {
|
int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
char tmpfile[PATH_MAX] = {0};
|
char tmpfile[PATH_MAX] = {0};
|
||||||
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);
|
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", pSdb->tmpDir);
|
||||||
|
char curfile[PATH_MAX] = {0};
|
||||||
|
snprintf(curfile, sizeof(curfile), "%ssdb.data", pSdb->currDir);
|
||||||
|
|
||||||
FileFd fd = taosOpenFileCreateWrite(tmpfile);
|
FileFd fd = taosOpenFileCreateWrite(tmpfile);
|
||||||
if (fd <= 0) {
|
if (fd <= 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to open file:%s for write since %s", tmpfile, terrstr());
|
mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code));
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
|
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
|
||||||
SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
|
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
|
||||||
if (encodeFp == NULL) continue;
|
if (encodeFp == NULL) continue;
|
||||||
|
|
||||||
SHashObj *hash = tsSdb.hashObjs[i];
|
SHashObj *hash = pSdb->hashObjs[i];
|
||||||
SRWLatch *pLock = &tsSdb.locks[i];
|
SRWLatch *pLock = &pSdb->locks[i];
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
||||||
|
@ -192,68 +198,46 @@ static int32_t sdbWriteDataFile() {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = taosFsyncFile(fd);
|
code = taosFsyncFile(fd);
|
||||||
|
if (code != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
char curfile[PATH_MAX] = {0};
|
|
||||||
snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
|
|
||||||
code = taosRenameFile(tmpfile, curfile);
|
code = taosRenameFile(tmpfile, curfile);
|
||||||
|
if (code != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = code;
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
||||||
mError("failed to write sdb file since %s", terrstr());
|
|
||||||
} else {
|
} else {
|
||||||
mDebug("write sdb file successfully");
|
mDebug("write file:%s successfully", curfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t sdbOpen() {
|
int32_t sdbDeploy(SSdb *pSdb) {
|
||||||
// mDebug("start to read mnode file");
|
int32_t code = sdbCreateDir(pSdb);
|
||||||
|
if (code != 0) {
|
||||||
// if (sdbReadDataFile() != 0) {
|
return code;
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// void sdbClose() {
|
|
||||||
// if (tsSdb.curVer != tsSdb.lastCommitVer) {
|
|
||||||
// mDebug("start to write mnode file");
|
|
||||||
// sdbWriteDataFile();
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for (int32_t i = 0; i < SDB_MAX; ++i) {
|
|
||||||
// SHashObj *hash = tsSdb.hashObjs[i];
|
|
||||||
// if (hash != NULL) {
|
|
||||||
// taosHashClear(hash);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t sdbDeploy() {
|
|
||||||
if (sdbCreateDir() != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sdbRunDeployFp() != 0) {
|
code = sdbRunDeployFp(pSdb);
|
||||||
return -1;
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sdbWriteDataFile() != 0) {
|
code = sdbWriteFile(pSdb);
|
||||||
return -1;
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sdbClose();
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbUnDeploy() {
|
|
||||||
mDebug("start to undeploy mnode");
|
|
||||||
taosRemoveDir(tsMnodeDir);
|
|
||||||
}
|
|
||||||
|
|
|
@ -15,15 +15,14 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "sdbInt.h"
|
#include "sdbInt.h"
|
||||||
#include "tglobal.h"
|
|
||||||
|
|
||||||
static SHashObj *sdbGetHash(int32_t sdb) {
|
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
|
||||||
if (sdb >= SDB_MAX || sdb <= SDB_START) {
|
if (type >= SDB_MAX || type <= SDB_START) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
|
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashObj *hash = tsSdb.hashObjs[sdb];
|
SHashObj *hash = pSdb->hashObjs[type];
|
||||||
if (hash == NULL) {
|
if (hash == NULL) {
|
||||||
terrno = TSDB_CODE_SDB_APP_ERROR;
|
terrno = TSDB_CODE_SDB_APP_ERROR;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -32,9 +31,9 @@ static SHashObj *sdbGetHash(int32_t sdb) {
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbGetkeySize(ESdbType sdb, void *pKey) {
|
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
|
||||||
int32_t keySize;
|
int32_t keySize;
|
||||||
EKeyType keyType = tsSdb.keyTypes[sdb];
|
EKeyType keyType = pSdb->keyTypes[type];
|
||||||
|
|
||||||
if (keyType == SDB_KEY_INT32) {
|
if (keyType == SDB_KEY_INT32) {
|
||||||
keySize = sizeof(int32_t);
|
keySize = sizeof(int32_t);
|
||||||
|
@ -47,77 +46,81 @@ static int32_t sdbGetkeySize(ESdbType sdb, void *pKey) {
|
||||||
return keySize;
|
return keySize;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
||||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SRWLatch *pLock = &pSdb->locks[pRow->sdb];
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||||
if (pDstRow != NULL) {
|
if (pDstRow != NULL) {
|
||||||
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return -1;
|
return TSDB_CODE_SDB_OBJ_ALREADY_THERE;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRow->refCount = 1;
|
pRow->refCount = 1;
|
||||||
pRow->status = pRaw->status;
|
pRow->status = pRaw->status;
|
||||||
|
|
||||||
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
|
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return -1;
|
return TSDB_CODE_SDB_OBJ_ALREADY_THERE;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
|
|
||||||
SdbInsertFp insertFp = tsSdb.insertFps[pRow->sdb];
|
SdbInsertFp insertFp = pSdb->insertFps[pRow->sdb];
|
||||||
if (insertFp != NULL) {
|
if (insertFp != NULL) {
|
||||||
if ((*insertFp)(pRow->pObj) != 0) {
|
code = (*insertFp)(pSdb, pRow->pObj);
|
||||||
|
if (code != 0) {
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
taosHashRemove(hash, pRow->pObj, keySize);
|
taosHashRemove(hash, pRow->pObj, keySize);
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
||||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SRWLatch *pLock = &pSdb->locks[pRow->sdb];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||||
if (ppDstRow == NULL || *ppDstRow == NULL) {
|
if (ppDstRow == NULL || *ppDstRow == NULL) {
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
return sdbInsertRow(hash, pRaw, pRow, keySize);
|
return sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
|
||||||
}
|
}
|
||||||
SSdbRow *pDstRow = *ppDstRow;
|
SSdbRow *pDstRow = *ppDstRow;
|
||||||
|
|
||||||
pRow->status = pRaw->status;
|
pRow->status = pRaw->status;
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
|
|
||||||
SdbUpdateFp updateFp = tsSdb.updateFps[pRow->sdb];
|
SdbUpdateFp updateFp = pSdb->updateFps[pRow->sdb];
|
||||||
if (updateFp != NULL) {
|
if (updateFp != NULL) {
|
||||||
(*updateFp)(pRow->pObj, pDstRow->pObj);
|
code = (*updateFp)(pSdb, pRow->pObj, pDstRow->pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
||||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SRWLatch *pLock = &pSdb->locks[pRow->sdb];
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||||
if (ppDstRow == NULL || *ppDstRow == NULL) {
|
if (ppDstRow == NULL || *ppDstRow == NULL) {
|
||||||
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return -1;
|
return TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||||
}
|
}
|
||||||
SSdbRow *pDstRow = *ppDstRow;
|
SSdbRow *pDstRow = *ppDstRow;
|
||||||
|
|
||||||
|
@ -125,71 +128,67 @@ static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
|
||||||
taosHashRemove(hash, pDstRow->pObj, keySize);
|
taosHashRemove(hash, pDstRow->pObj, keySize);
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
|
|
||||||
SdbDeleteFp deleteFp = tsSdb.deleteFps[pDstRow->sdb];
|
SdbDeleteFp deleteFp = pSdb->deleteFps[pDstRow->sdb];
|
||||||
if (deleteFp != NULL) {
|
if (deleteFp != NULL) {
|
||||||
(void)(*deleteFp)(pDstRow->pObj);
|
code = (*deleteFp)(pSdb, pDstRow->pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pDstRow->pObj);
|
sdbRelease(pSdb, pDstRow->pObj);
|
||||||
sdbFreeRow(pRow);
|
sdbFreeRow(pRow);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbWriteImp(SSdbRaw *pRaw) {
|
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) {
|
||||||
SHashObj *hash = sdbGetHash(pRaw->sdb);
|
SHashObj *hash = sdbGetHash(pSdb, pRaw->sdb);
|
||||||
if (hash == NULL) return -1;
|
if (hash == NULL) return terrno;
|
||||||
|
|
||||||
SdbDecodeFp decodeFp = tsSdb.decodeFps[pRaw->sdb];
|
SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->sdb];
|
||||||
SSdbRow *pRow = (*decodeFp)(pRaw);
|
SSdbRow *pRow = (*decodeFp)(pRaw);
|
||||||
if (pRow == NULL) {
|
if (pRow == NULL) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
return terrno;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pRow->sdb = pRaw->sdb;
|
pRow->sdb = pRaw->sdb;
|
||||||
|
|
||||||
int32_t keySize = sdbGetkeySize(pRow->sdb, pRow->pObj);
|
int32_t keySize = sdbGetkeySize(pSdb, pRow->sdb, pRow->pObj);
|
||||||
int32_t code = -1;
|
int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
|
||||||
|
|
||||||
switch (pRaw->status) {
|
switch (pRaw->status) {
|
||||||
case SDB_STATUS_CREATING:
|
case SDB_STATUS_CREATING:
|
||||||
code = sdbInsertRow(hash, pRaw, pRow, keySize);
|
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
|
||||||
break;
|
break;
|
||||||
case SDB_STATUS_READY:
|
case SDB_STATUS_READY:
|
||||||
case SDB_STATUS_DROPPING:
|
case SDB_STATUS_DROPPING:
|
||||||
code = sdbUpdateRow(hash, pRaw, pRow, keySize);
|
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
|
||||||
break;
|
break;
|
||||||
case SDB_STATUS_DROPPED:
|
case SDB_STATUS_DROPPED:
|
||||||
code = sdbDeleteRow(hash, pRaw, pRow, keySize);
|
code = sdbDeleteRow(pSdb, hash, pRaw, pRow, keySize);
|
||||||
break;
|
|
||||||
default:
|
|
||||||
terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbWrite(SSdbRaw *pRaw) {
|
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
|
||||||
int32_t code = sdbWriteImp(pRaw);
|
int32_t code = sdbWriteRaw(pSdb, pRaw);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *sdbAcquire(ESdbType sdb, void *pKey) {
|
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
|
||||||
SHashObj *hash = sdbGetHash(sdb);
|
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||||
if (hash == NULL) return NULL;
|
if (hash == NULL) return NULL;
|
||||||
|
|
||||||
void *pRet = NULL;
|
void *pRet = NULL;
|
||||||
int32_t keySize = sdbGetkeySize(sdb, pKey);
|
int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
|
||||||
|
|
||||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
SRWLatch *pLock = &pSdb->locks[type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
|
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
|
||||||
if (ppRow == NULL || *ppRow == NULL) {
|
if (ppRow == NULL || *ppRow == NULL) {
|
||||||
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
|
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,13 +213,13 @@ void *sdbAcquire(ESdbType sdb, void *pKey) {
|
||||||
return pRet;
|
return pRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbRelease(void *pObj) {
|
void sdbRelease(SSdb *pSdb, void *pObj) {
|
||||||
if (pObj == NULL) return;
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
|
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
|
||||||
if (pRow->sdb >= SDB_MAX || pRow->sdb <= SDB_START) return;
|
if (pRow->sdb >= SDB_MAX || pRow->sdb <= SDB_START) return;
|
||||||
|
|
||||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
SRWLatch *pLock = &pSdb->locks[pRow->sdb];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
|
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
|
||||||
|
@ -231,11 +230,11 @@ void sdbRelease(void *pObj) {
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj) {
|
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
||||||
SHashObj *hash = sdbGetHash(sdb);
|
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||||
if (hash == NULL) return NULL;
|
if (hash == NULL) return NULL;
|
||||||
|
|
||||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
SRWLatch *pLock = &pSdb->locks[type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
|
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
|
||||||
|
@ -255,23 +254,23 @@ void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj) {
|
||||||
return ppRow;
|
return ppRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbCancelFetch(void *pIter) {
|
void sdbCancelFetch(SSdb *pSdb, void *pIter) {
|
||||||
if (pIter == NULL) return;
|
if (pIter == NULL) return;
|
||||||
SSdbRow *pRow = *(SSdbRow **)pIter;
|
SSdbRow *pRow = *(SSdbRow **)pIter;
|
||||||
SHashObj *hash = sdbGetHash(pRow->sdb);
|
SHashObj *hash = sdbGetHash(pSdb, pRow->sdb);
|
||||||
if (hash == NULL) return;
|
if (hash == NULL) return;
|
||||||
|
|
||||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
SRWLatch *pLock = &pSdb->locks[pRow->sdb];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
taosHashCancelIterate(hash, pIter);
|
taosHashCancelIterate(hash, pIter);
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbGetSize(ESdbType sdb) {
|
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
|
||||||
SHashObj *hash = sdbGetHash(sdb);
|
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||||
if (hash == NULL) return 0;
|
if (hash == NULL) return 0;
|
||||||
|
|
||||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
SRWLatch *pLock = &pSdb->locks[type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
int32_t size = taosHashGetSize(hash);
|
int32_t size = taosHashGetSize(hash);
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
|
|
|
@ -35,4 +35,4 @@ void *sdbGetRowObj(SSdbRow *pRow) {
|
||||||
return pRow->pObj;
|
return pRow->pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbFreeRow(SSdbRow *pRow) { free(pRow); }
|
void sdbFreeRow(SSdbRow *pRow) { tfree(pRow); }
|
||||||
|
|
Loading…
Reference in New Issue