From f26cf444f661df0d2564789cdde99f869abee86a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 12 Nov 2021 15:06:58 +0800 Subject: [PATCH] refact sdb --- include/dnode/mnode/sdb/sdb.h | 145 +++++---- include/util/taoserror.h | 3 +- source/dnode/mnode/impl/src/mnodeAcct.c | 93 +++--- source/dnode/mnode/impl/src/mnodeSync.c | 4 +- source/dnode/mnode/impl/src/mnodeUser.c | 86 ++--- source/dnode/mnode/sdb/inc/sdbInt.h | 24 +- source/dnode/mnode/sdb/src/sdb.c | 323 +------------------ source/dnode/mnode/sdb/src/sdbFile.c | 204 ++++++++++++ source/dnode/mnode/sdb/src/sdbHash.c | 199 ++++++++++++ source/dnode/mnode/sdb/src/sdbRaw.c | 206 ++++++++++++ source/dnode/mnode/sdb/src/sdbRow.c | 38 +++ source/dnode/mnode/transaction/inc/trnInt.h | 18 +- source/dnode/mnode/transaction/src/trn.c | 107 +++--- source/dnode/mnode/transaction/src/trnExec.c | 2 +- source/util/src/terror.c | 2 +- 15 files changed, 896 insertions(+), 558 deletions(-) create mode 100644 source/dnode/mnode/sdb/src/sdbFile.c create mode 100644 source/dnode/mnode/sdb/src/sdbHash.c create mode 100644 source/dnode/mnode/sdb/src/sdbRaw.c create mode 100644 source/dnode/mnode/sdb/src/sdbRow.c diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index f4a8ccd06e..c693f8e64a 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -20,58 +20,84 @@ extern "C" { #endif -#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \ - if (code == 0) { \ - if ((dataLen) >= (valLen)) { \ - memcpy((val), (char *)(pData), (valLen)); \ - (dataLen) -= (valLen); \ - (pData) = (char *)(pData) + (valLen); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT64(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int64_t); \ } -#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \ - if (code == 0) { \ - if (dataLen >= sizeof(int32_t)) { \ - *(int32_t *)(pData) = (int32_t)(val); \ - (dataLen) -= sizeof(int32_t); \ - (pData) = (char *)(pData) + sizeof(int32_t); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT32(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int32_t); \ } -#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \ - if (code == 0) { \ - if (dataLen >= sizeof(int64_t)) { \ - *(int64_t *)(pData) = (int64_t)(val); \ - (dataLen) -= sizeof(int64_t); \ - (pData) = (char *)(pData) + sizeof(int64_t); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT8(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int8_t); \ } -#define SDB_SET_INT64_VAL(pData, dataLen, val) \ - { \ - *(int64_t *)(pData) = (int64_t)(val); \ - (dataLen) += sizeof(int64_t); \ - (pData) += sizeof(int64_t); \ +#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \ + { \ + if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += valLen; \ } -#define SDB_SET_INT32_VAL(pData, dataLen, val) \ - { \ - *(int32_t *)(pData) = (int32_t)(val); \ - (dataLen) += sizeof(int32_t); \ - (pData) += sizeof(int32_t); \ +#define SDB_SET_INT64(pData, dataPos, val) \ + { \ + if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int64_t); \ } -#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \ - { \ - memcpy((char *)(pData), (val), (valLen)); \ - (dataLen) += (valLen); \ - (pData) += (valLen); \ +#define SDB_SET_INT32(pData, dataPos, val) \ + { \ + if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int32_t); \ + } + +#define SDB_SET_INT8(pData, dataPos, val) \ + { \ + if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int8_t); \ + } + +#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \ + { \ + if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += valLen; \ + } + +#define SDB_SET_DATALEN(pRaw, dataLen) \ + { \ + if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ } typedef enum { @@ -93,23 +119,14 @@ typedef enum { typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus; - -typedef struct { - int8_t type; - int8_t sver; - int8_t status; - int8_t action; - int8_t reserved[4]; - int32_t cksum; - int32_t dataLen; - char data[]; -} SSdbRaw; +typedef struct SSdbRaw SSdbRaw; +typedef struct SSdbRow SSdbRow; typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeployFp)(); -typedef void *(*SdbDecodeFp)(SSdbRaw *pRaw); +typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef struct { @@ -140,6 +157,26 @@ void *sdbFetch(ESdbType sdb, void *pIter); void sdbCancelFetch(ESdbType sdb, void *pIter); int32_t sdbGetSize(ESdbType sdb); +SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen); +void sdbFreeRaw(SSdbRaw *pRaw); +int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); +int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); +int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); +int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen); +int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); +int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); +int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action); +int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); +int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); +int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); +int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen); +int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); +int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); + +SSdbRow *sdbAllocRow(int32_t objSize); +void sdbFreeRow(SSdbRow *pRow); +void *sdbGetRowObj(SSdbRow *pRow); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b644f4c9a5..dcd8b9a743 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -133,7 +133,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") - #define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321) #define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) @@ -143,7 +142,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) #define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327) #define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328) -#define TSDB_CODE_SDB_INVALID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0329) +#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x0329) #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists") #define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist") diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index 381d6a184e..cd9087ee30 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -16,69 +16,56 @@ #define _DEFAULT_SOURCE #include "mnodeInt.h" -#define ACCT_VER 1 +#define SDB_ACCT_VER 1 static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { - SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, SDB_ACCT_VER, sizeof(SAcctObj)); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->createdTime) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->updateTime) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->acctId) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->status) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState) + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN) + SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime) + SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime) + SDB_SET_INT32(pRaw, dataPos, pAcct->acctId) + SDB_SET_INT32(pRaw, dataPos, pAcct->status) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams) + SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_ACCT; - pRaw->sver = ACCT_VER; return pRaw; } -static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != ACCT_VER) { +static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_ACCT_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); - if (pAcct == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + SAcctObj *pAcct = sdbGetRowObj(pRow); + if (pAcct == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pAcct->acct, TSDB_USER_LEN) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->updateTime) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->acctId) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->status) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxUsers) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxDbs) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxTimeSeries) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxStreams) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->cfg.maxStorage) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.accessState) - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->createdTime, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->updateTime, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->acctId, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->status, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState, code) - - if (code != 0) { - tfree(pAcct); - terrno = code; - return NULL; - } - - return pAcct; + return pRow; } static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } @@ -106,9 +93,7 @@ static int32_t mnodeCreateDefaultAcct() { .accessState = TSDB_VN_ALL_ACCCESS}; SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj); - if (pRaw == NULL) { - return -1; - } + if (pRaw == NULL) return -1; return sdbWrite(pRaw); } diff --git a/source/dnode/mnode/impl/src/mnodeSync.c b/source/dnode/mnode/impl/src/mnodeSync.c index 7541ab6b59..fd34793172 100644 --- a/source/dnode/mnode/impl/src/mnodeSync.c +++ b/source/dnode/mnode/impl/src/mnodeSync.c @@ -21,8 +21,8 @@ int32_t mnodeInitSync() { return 0; } void mnodeCleanUpSync() {} int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) { - trnApply(pRaw, pData, 0); - free(pRaw); + trnApply(pData, pData, 0); + free(pData); return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index b15dc78ea9..d74ba0f7cc 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -19,59 +19,46 @@ #include "tglobal.h" #include "tkey.h" -#define USER_VER 1 +#define SDB_USER_VER 1 static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { - SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj)); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN) - SDB_SET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN) - SDB_SET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_KEY_LEN) - SDB_SET_INT64_VAL(pData, dataLen, pUser->createdTime) - SDB_SET_INT64_VAL(pData, dataLen, pUser->updateTime) - SDB_SET_INT32_VAL(pData, dataLen, pUser->rootAuth) + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN) + SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN) + SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_KEY_LEN) + SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) + SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) + SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_USER; - pRaw->sver = USER_VER; return pRaw; } -static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != USER_VER) { +static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_USER_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - SUserObj *pUser = calloc(1, sizeof(SUserObj)); - if (pUser == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + SUserObj *pUser = sdbGetRowObj(pRow); + if (pUser == NULL) return NULL; - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN, code) - SDB_GET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN, code) - SDB_GET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_USER_LEN, code) - SDB_GET_INT64_VAL(pData, dataLen, pUser->createdTime, code) - SDB_GET_INT64_VAL(pData, dataLen, pUser->updateTime, code) - SDB_GET_INT32_VAL(pData, dataLen, pUser->rootAuth, code) + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN) + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_KEY_LEN) + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) + SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) + SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth) - if (code != 0) { - tfree(pUser); - terrno = code; - return NULL; - } - - return pUser; + return pRow; } static int32_t mnodeUserActionInsert(SUserObj *pUser) { @@ -122,9 +109,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { } SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); - if (pRaw == NULL) { - return -1; - } + if (pRaw == NULL) return -1; return sdbWrite(pRaw); } @@ -156,32 +141,31 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK); if (pTrans == NULL) return -1; + trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle); SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { trnDrop(pTrans); return -1; } - pRedoRaw->status = SDB_STATUS_CREATING; - pRedoRaw->action = SDB_ACTION_INSERT; + sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); + sdbSetRawAction(pRedoRaw, SDB_ACTION_INSERT); SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { trnDrop(pTrans); return -1; } - pUndoRaw->status = SDB_STATUS_DROPPING; - pUndoRaw->action = SDB_ACTION_DELETE; + sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPING); + sdbSetRawAction(pUndoRaw, SDB_ACTION_DELETE); SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { trnDrop(pTrans); return -1; } - pCommitRaw->status = SDB_STATUS_READY; - pCommitRaw->action = SDB_ACTION_UPDATE; - - trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle); + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + sdbSetRawAction(pCommitRaw, SDB_ACTION_UPDATE); if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { trnDrop(pTrans); diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 222e16abe9..5ff1c892a1 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -36,6 +36,23 @@ extern "C" { #define SDB_MAX_SIZE (32 * 1024) +typedef struct SSdbRaw { + int8_t sdb; + int8_t sver; + int8_t status; + int8_t action; + int8_t reserved[4]; + int32_t cksum; + int32_t dataLen; + char pData[]; +} SSdbRaw; + +typedef struct SSdbRow { + ESdbStatus status; + int32_t refCount; + char pObj[]; +} SSdbRow; + typedef struct { char *currDir; char *syncDir; @@ -53,12 +70,7 @@ typedef struct { SdbDecodeFp decodeFps[SDB_MAX]; } SSdbMgr; -typedef struct { - ESdbStatus status; - int32_t refCount; - int32_t dataLen; - char pData[]; -} SSdbRow; +extern SSdbMgr tsSdb; #ifdef __cplusplus } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 7489a46552..d8121df5a2 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -17,224 +17,7 @@ #include "sdbInt.h" #include "tglobal.h" -static SSdbMgr tsSdb = {0}; - -static int32_t sdbCreateDir() { - if (!taosMkDir(tsSdb.currDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr()); - return -1; - } - - if (!taosMkDir(tsSdb.syncDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr()); - return -1; - } - - if (!taosMkDir(tsSdb.tmpDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr()); - return -1; - } - - return 0; -} - -static int32_t sdbRunDeployFp() { - for (int32_t i = SDB_START; i < SDB_MAX; ++i) { - SdbDeployFp fp = tsSdb.deployFps[i]; - if (fp == NULL) continue; - if ((*fp)() != 0) { - mError("failed to deploy sdb:%d since %s", i, terrstr()); - return -1; - } - } - - return 0; -} - -static SHashObj *sdbGetHash(int32_t sdb) { - if (sdb >= SDB_MAX || sdb <= SDB_START) { - terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; - return NULL; - } - - SHashObj *hash = tsSdb.hashObjs[sdb]; - if (hash == NULL) { - terrno = TSDB_CODE_SDB_APP_ERROR; - return NULL; - } - - return hash; -} - -int32_t sdbWrite(SSdbRaw *pRaw) { - SHashObj *hash = sdbGetHash(pRaw->type); - switch (pRaw->action) { - case SDB_ACTION_INSERT: - break; - case SDB_ACTION_UPDATE: - break; - case SDB_ACTION_DELETE: - break; - - default: - break; - } - - return 0; -} - -static int32_t sdbWriteVersion(FileFd fd) { return 0; } - -static int32_t sdbReadVersion(FileFd fd) { return 0; } - -static int32_t sdbReadDataFile() { - int32_t code = 0; - - SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); - if (pRaw == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - char file[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); - FileFd fd = taosOpenFileCreateWrite(file); - if (fd <= 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for read since %s", file, tstrerror(code)); - return code; - } - - int64_t offset = 0; - while (1) { - int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw)); - if (ret == 0) break; - - if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); - break; - } - - if (ret < sizeof(SSdbRaw)) { - code = TSDB_CODE_SDB_APP_ERROR; - mError("failed to read file:%s since %s", file, tstrerror(code)); - break; - } - - code = sdbWrite(pRaw); - if (code != 0) { - mError("failed to read file:%s since %s", file, tstrerror(code)); - goto PARSE_SDB_DATA_ERROR; - } - } - - code = 0; - -PARSE_SDB_DATA_ERROR: - taosCloseFile(fd); - return code; -} - -static int32_t sdbWriteDataFile() { - int32_t code = 0; - - char tmpfile[PATH_MAX] = {0}; - snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); - - FileFd fd = taosOpenFileCreateWrite(tmpfile); - if (fd <= 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); - return code; - } - - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { - SHashObj *hash = tsSdb.hashObjs[i]; - if (!hash) continue; - - SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; - if (!encodeFp) continue; - - SSdbRow *pRow = taosHashIterate(hash, NULL); - while (pRow != NULL) { - if (pRow->status == SDB_STATUS_READY) continue; - SSdbRaw *pRaw = (*encodeFp)(pRow->pData); - if (pRaw != NULL) { - taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); - } else { - taosHashCancelIterate(hash, pRow); - code = TSDB_CODE_SDB_APP_ERROR; - break; - } - - pRow = taosHashIterate(hash, pRow); - } - } - - if (code == 0) { - code = sdbWriteVersion(fd); - } - - taosCloseFile(fd); - - if (code == 0) { - code = taosFsyncFile(fd); - } - - if (code != 0) { - char curfile[PATH_MAX] = {0}; - snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); - code = taosRenameFile(tmpfile, curfile); - } - - if (code != 0) { - mError("failed to write sdb file since %s", tstrerror(code)); - } else { - mInfo("write sdb file successfully"); - } - - return code; -} - -int32_t sdbRead() { - int32_t code = sdbReadDataFile(); - if (code != 0) { - return code; - } - - mInfo("read sdb file successfully"); - return -1; -} - -int32_t sdbCommit() { - int32_t code = sdbWriteDataFile(); - if (code != 0) { - return code; - } - - return 0; -} - -int32_t sdbDeploy() { - if (sdbCreateDir() != 0) { - return -1; - } - - if (sdbRunDeployFp() != 0) { - return -1; - } - - if (sdbCommit() != 0) { - return -1; - } - - return 0; -} - -void sdbUnDeploy() {} +SSdbMgr tsSdb = {0}; int32_t sdbInit() { char path[PATH_MAX + 100]; @@ -309,108 +92,4 @@ void sdbSetTable(SSdbTable table) { tsSdb.deployFps[sdb] = table.deployFp; tsSdb.encodeFps[sdb] = table.encodeFp; tsSdb.decodeFps[sdb] = table.decodeFp; -} - -#if 0 -void *sdbInsertRow(ESdbType sdb, void *p) { - SdbHead *pHead = p; - pHead->type = sdb; - pHead->status = SDB_AVAIL; - - char *pKey = (char *)pHead + sizeof(pHead); - int32_t keySize; - EKeyType keyType = tsSdb.keyTypes[pHead->type]; - int32_t dataSize = tsSdb.dataSize[pHead->type]; - - SHashObj *hash = sdbGetHash(pHead->type); - if (hash == NULL) { - return NULL; - } - - if (keyType == SDBINT32) { - keySize = sizeof(int32_t); - } else if (keyType == SDB_KEY_BINARY) { - keySize = strlen(pKey) + 1; - } else { - keySize = sizeof(int64_t); - } - - taosHashPut(hash, pKey, keySize, pHead, dataSize); - return taosHashGet(hash, pKey, keySize); -} - -void sdbDeleteRow(ESdbType sdb, void *p) { - SdbHead *pHead = p; - pHead->status = SDB_STATUS_DROPPED; -} - -void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } - -#endif - -void *sdbAcquire(ESdbType sdb, void *pKey) { - terrno = 0; - - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return NULL; - } - - int32_t keySize; - EKeyType keyType = tsSdb.keyTypes[sdb]; - - switch (keyType) { - case SDB_KEY_INT32: - keySize = sizeof(int32_t); - break; - case SDB_KEY_INT64: - keySize = sizeof(int64_t); - break; - case SDB_KEY_BINARY: - keySize = strlen(pKey) + 1; - break; - default: - keySize = sizeof(int32_t); - } - - SSdbRow *pRow = taosHashGet(hash, pKey, keySize); - if (pRow == NULL) return NULL; - - if (pRow->status == SDB_STATUS_READY) { - atomic_add_fetch_32(&pRow->refCount, 1); - return pRow->pData; - } else { - terrno = -1; // todo - return NULL; - } -} - -void sdbRelease(void *pObj) { - SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); - atomic_sub_fetch_32(&pRow->refCount, 1); -} - -void *sdbFetchRow(ESdbType sdb, void *pIter) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return NULL; - } - - return taosHashIterate(hash, pIter); -} - -void sdbCancelFetch(ESdbType sdb, void *pIter) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return; - } - taosHashCancelIterate(hash, pIter); -} - -int32_t sdbGetSize(ESdbType sdb) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return 0; - } - return taosHashGetSize(hash); } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c new file mode 100644 index 0000000000..8358b9aad1 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" +#include "tglobal.h" + + +static int32_t sdbCreateDir() { + if (!taosMkDir(tsSdb.currDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr()); + return -1; + } + + if (!taosMkDir(tsSdb.syncDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr()); + return -1; + } + + if (!taosMkDir(tsSdb.tmpDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr()); + return -1; + } + + return 0; +} + +static int32_t sdbRunDeployFp() { + for (int32_t i = SDB_START; i < SDB_MAX; ++i) { + SdbDeployFp fp = tsSdb.deployFps[i]; + if (fp == NULL) continue; + if ((*fp)() != 0) { + mError("failed to deploy sdb:%d since %s", i, terrstr()); + return -1; + } + } + + return 0; +} + +static int32_t sdbWriteVersion(FileFd fd) { return 0; } + +static int32_t sdbReadVersion(FileFd fd) { return 0; } + +static int32_t sdbReadDataFile() { + int32_t code = 0; + + SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); + if (pRaw == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + FileFd fd = taosOpenFileCreateWrite(file); + if (fd <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for read since %s", file, tstrerror(code)); + return code; + } + + int64_t offset = 0; + while (1) { + int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw)); + if (ret == 0) break; + + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + if (ret < sizeof(SSdbRaw)) { + code = TSDB_CODE_SDB_APP_ERROR; + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + code = sdbWrite(pRaw); + if (code != 0) { + mError("failed to read file:%s since %s", file, tstrerror(code)); + goto PARSE_SDB_DATA_ERROR; + } + } + + code = 0; + +PARSE_SDB_DATA_ERROR: + taosCloseFile(fd); + return code; +} + +static int32_t sdbWriteDataFile() { + int32_t code = 0; + + char tmpfile[PATH_MAX] = {0}; + snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); + + FileFd fd = taosOpenFileCreateWrite(tmpfile); + if (fd <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); + return code; + } + + for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + SHashObj *hash = tsSdb.hashObjs[i]; + if (!hash) continue; + + SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; + if (!encodeFp) continue; + + SSdbRow *pRow = taosHashIterate(hash, NULL); + while (pRow != NULL) { + if (pRow->status == SDB_STATUS_READY) continue; + SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); + if (pRaw != NULL) { + taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); + } else { + taosHashCancelIterate(hash, pRow); + code = TSDB_CODE_SDB_APP_ERROR; + break; + } + + pRow = taosHashIterate(hash, pRow); + } + } + + if (code == 0) { + code = sdbWriteVersion(fd); + } + + taosCloseFile(fd); + + if (code == 0) { + code = taosFsyncFile(fd); + } + + if (code != 0) { + char curfile[PATH_MAX] = {0}; + snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); + code = taosRenameFile(tmpfile, curfile); + } + + if (code != 0) { + mError("failed to write sdb file since %s", tstrerror(code)); + } else { + mInfo("write sdb file successfully"); + } + + return code; +} + +int32_t sdbRead() { + int32_t code = sdbReadDataFile(); + if (code != 0) { + return code; + } + + mInfo("read sdb file successfully"); + return -1; +} + +int32_t sdbCommit() { + int32_t code = sdbWriteDataFile(); + if (code != 0) { + return code; + } + + return 0; +} + +int32_t sdbDeploy() { + if (sdbCreateDir() != 0) { + return -1; + } + + if (sdbRunDeployFp() != 0) { + return -1; + } + + if (sdbCommit() != 0) { + return -1; + } + + return 0; +} + +void sdbUnDeploy() {} diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c new file mode 100644 index 0000000000..dcadfdc4bc --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" +#include "tglobal.h" + +static SHashObj *sdbGetHash(int32_t sdb) { + if (sdb >= SDB_MAX || sdb <= SDB_START) { + terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; + return NULL; + } + + SHashObj *hash = tsSdb.hashObjs[sdb]; + if (hash == NULL) { + terrno = TSDB_CODE_SDB_APP_ERROR; + return NULL; + } + + return hash; +} + +static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SSdbRow *pDstRow = NULL; + if (pDstRow != NULL) { + terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; + return -1; + } + + pRow->refCount = 0; + pRow->status = pRaw->status; + + if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + SdbInsertFp insertFp = tsSdb.insertFps[pRaw->sdb]; + if (insertFp != NULL) { + if ((*insertFp)(pRow->pObj) != 0) { + taosHashRemove(hash, pRow->pObj, keySize); + return -1; + } + } + + return 0; +} + +static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SSdbRow *pDstRow = NULL; + if (pDstRow == NULL) { + terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; + return -1; + } + + SdbUpdateFp updateFp = tsSdb.updateFps[pRaw->sdb]; + if (updateFp != NULL) { + if ((*updateFp)(pRow->pObj, pDstRow->pObj) != 0) { + return -1; + } + } + + return 0; +} + +static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SSdbRow *pDstRow = NULL; + if (pDstRow == NULL) { + terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; + return -1; + } + + SdbDeleteFp deleteFp = tsSdb.deleteFps[pRaw->sdb]; + if (deleteFp != NULL) { + if ((*deleteFp)(pRow->pObj) != 0) { + return -1; + } + } + + taosHashRemove(hash, pRow->pObj, keySize); + return 0; +} + +int32_t sdbWrite(SSdbRaw *pRaw) { + SHashObj *hash = sdbGetHash(pRaw->sdb); + if (hash == NULL) return -1; + + SdbDecodeFp decodeFp = tsSdb.decodeFps[pRaw->sdb]; + SSdbRow *pRow = (*decodeFp)(pRaw); + if (pRow == NULL) { + terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; + return -1; + } + + int32_t keySize; + EKeyType keyType = tsSdb.keyTypes[pRaw->sdb]; + if (keyType == SDB_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == SDB_KEY_BINARY) { + keySize = strlen(pRow->pObj) + 1; + } else { + keySize = sizeof(int64_t); + } + + int32_t code = -1; + if (pRaw->action == SDB_ACTION_INSERT) { + code = sdbInsertRow(hash, pRaw, pRow, keySize); + } else if (pRaw->action == SDB_ACTION_UPDATE) { + code = sdbUpdateRow(hash, pRaw, pRow, keySize); + } else if (pRaw->action == SDB_ACTION_DELETE) { + code = sdbDeleteRow(hash, pRaw, pRow, keySize); + } else { + terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE; + } + + if (code != 0) { + sdbFreeRow(pRow); + } + return 0; +} + +void *sdbAcquire(ESdbType sdb, void *pKey) { + terrno = 0; + + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + int32_t keySize; + EKeyType keyType = tsSdb.keyTypes[sdb]; + + switch (keyType) { + case SDB_KEY_INT32: + keySize = sizeof(int32_t); + break; + case SDB_KEY_INT64: + keySize = sizeof(int64_t); + break; + case SDB_KEY_BINARY: + keySize = strlen(pKey) + 1; + break; + default: + keySize = sizeof(int32_t); + } + + SSdbRow *pRow = taosHashGet(hash, pKey, keySize); + if (pRow == NULL) return NULL; + + if (pRow->status == SDB_STATUS_READY) { + atomic_add_fetch_32(&pRow->refCount, 1); + return pRow->pObj; + } else { + terrno = -1; // todo + return NULL; + } +} + +void sdbRelease(void *pObj) { + SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); + atomic_sub_fetch_32(&pRow->refCount, 1); +} + +void *sdbFetchRow(ESdbType sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + return taosHashIterate(hash, pIter); +} + +void sdbCancelFetch(ESdbType sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return; + } + taosHashCancelIterate(hash, pIter); +} + +int32_t sdbGetSize(ESdbType sdb) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return 0; + } + return taosHashGetSize(hash); +} diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c new file mode 100644 index 0000000000..8c550394ed --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" + +SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen) { + SSdbRaw *pRaw = calloc(1, dataLen + sizeof(SSdbRaw)); + if (pRaw == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pRaw->sdb = sdb; + pRaw->sver = sver; + pRaw->dataLen = dataLen; + return pRaw; +} + +void sdbFreeRaw(SSdbRaw *pRaw) { free(pRaw); } + +int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int8_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int32_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int32_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int64_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int64_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + valLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + memcpy(pRaw->pData + dataPos, pVal, valLen); + return 0; +} + +int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + pRaw->dataLen = dataLen; + return 0; +} + +int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + pRaw->status = status; + return 0; +} + +int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + pRaw->action = action; + return 0; +} + +int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int8_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int32_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int32_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int64_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int64_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + valLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + memcpy(pVal, pRaw->pData + dataPos, valLen); + return 0; +} + +int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + *sver = pRaw->sver; + return 0; +} + +int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + return sizeof(SSdbRaw) + pRaw->dataLen; +} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbRow.c b/source/dnode/mnode/sdb/src/sdbRow.c new file mode 100644 index 0000000000..68a0faa7b9 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbRow.c @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" + +SSdbRow *sdbAllocRow(int32_t objSize) { + SSdbRow *pRow = calloc(1, objSize + sizeof(SSdbRow)); + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + return pRow; +} + +void *sdbGetRowObj(SSdbRow *pRow) { + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + return pRow->pObj; +} + +void sdbFreeRow(SSdbRow *pRow) { free(pRow); } diff --git a/source/dnode/mnode/transaction/inc/trnInt.h b/source/dnode/mnode/transaction/inc/trnInt.h index 03860734ee..771217dcc0 100644 --- a/source/dnode/mnode/transaction/inc/trnInt.h +++ b/source/dnode/mnode/transaction/inc/trnInt.h @@ -45,15 +45,15 @@ typedef enum { } ETrnStage; typedef struct STrans { - int32_t id; - ETrnStage stage; - ETrnPolicy policy; - void *rpcHandle; - SArray *redoLogs; - SArray *undoLogs; - SArray *commitLogs; - SArray *redoActions; - SArray *undoActions; + int32_t id; + int8_t stage; + int8_t policy; + void *rpcHandle; + SArray *redoLogs; + SArray *undoLogs; + SArray *commitLogs; + SArray *redoActions; + SArray *undoActions; } STrans; SSdbRaw *trnActionEncode(STrans *pTrans); diff --git a/source/dnode/mnode/transaction/src/trn.c b/source/dnode/mnode/transaction/src/trn.c index 1f65eb72ad..0d7c1a061e 100644 --- a/source/dnode/mnode/transaction/src/trn.c +++ b/source/dnode/mnode/transaction/src/trn.c @@ -16,8 +16,10 @@ #define _DEFAULT_SOURCE #include "trnInt.h" +#define SDB_TRANS_VER 1 + SSdbRaw *trnActionEncode(STrans *pTrans) { - int32_t rawDataLen = 5 * sizeof(int32_t); + int32_t rawDataLen = 10 * sizeof(int32_t); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -25,91 +27,84 @@ SSdbRaw *trnActionEncode(STrans *pTrans) { int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); for (int32_t index = 0; index < redoLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->redoLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->redoLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } for (int32_t index = 0; index < undoLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->undoLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->undoLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } for (int32_t index = 0; index < commitLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->commitLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->commitLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } - SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_INT32_VAL(pData, dataLen, redoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, undoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, commitLogNum) - SDB_SET_INT32_VAL(pData, dataLen, redoActionNum) - SDB_SET_INT32_VAL(pData, dataLen, undoActionNum) + int32_t dataPos = 0; + SDB_SET_INT32(pData, dataPos, pTrans->id) + SDB_SET_INT8(pData, dataPos, pTrans->stage) + SDB_SET_INT8(pData, dataPos, pTrans->policy) + SDB_SET_INT32(pData, dataPos, redoLogNum) + SDB_SET_INT32(pData, dataPos, undoLogNum) + SDB_SET_INT32(pData, dataPos, commitLogNum) + SDB_SET_INT32(pData, dataPos, redoActionNum) + SDB_SET_INT32(pData, dataPos, undoActionNum) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_TRANS; - pRaw->sver = TRN_VER; return pRaw; } STrans *trnActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != TRN_VER) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_TRANS_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - STrans *pTrans = NULL; - if (pTrans == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); + STrans *pTrans = sdbGetRowObj(pRow); + if (pTrans == NULL) return NULL; int32_t redoLogNum = 0; int32_t undoLogNum = 0; int32_t commitLogNum = 0; int32_t redoActionNum = 0; int32_t undoActionNum = 0; - SSdbRaw *pTmp = malloc(sizeof(SSdbRaw)); - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id) + SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->stage) + SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->policy) + SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) for (int32_t index = 0; index < redoLogNum; ++index) { - SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRaw), code); - if (code == 0 && pTmp->dataLen > 0) { - SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen); - if (pRead == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } - memcpy(pRead, pTmp, sizeof(SSdbRaw)); - SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); - void *ret = taosArrayPush(pTrans->redoLogs, &pRead); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } + int32_t dataLen = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) + + char *pData = malloc(dataLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->redoLogs, pData); + if (ret == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; } } - if (code != 0) { - trnDrop(pTrans); - terrno = code; - return NULL; - } + // if (code != 0) { + // trnDrop(pTrans); + // terrno = code; + // return NULL; + // } return pTrans; } diff --git a/source/dnode/mnode/transaction/src/trnExec.c b/source/dnode/mnode/transaction/src/trnExec.c index 6bb46f1283..fc15c16225 100644 --- a/source/dnode/mnode/transaction/src/trnExec.c +++ b/source/dnode/mnode/transaction/src/trnExec.c @@ -52,7 +52,7 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) { return 0; } - if (sdbWrite(pRaw) != 0) { + if (sdbWrite(pData) != 0) { code = terrno; trnSendRpcRsp(pData, code); terrno = code; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2053554666..d6f2f78d8e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -153,7 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len") -TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_META_ROW, "Invalid meta row") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist")