TD-2046
This commit is contained in:
parent
60146985ec
commit
f38b218247
|
@ -47,7 +47,7 @@ typedef enum {
|
|||
SDB_OPER_LOCAL
|
||||
} ESdbOper;
|
||||
|
||||
typedef struct SSdbOper {
|
||||
typedef struct SSWriteMsg {
|
||||
ESdbOper type;
|
||||
int32_t rowSize;
|
||||
int32_t retCode; // for callback in sdb queue
|
||||
|
@ -58,7 +58,7 @@ typedef struct SSdbOper {
|
|||
void * pObj;
|
||||
void * rowData;
|
||||
struct SMnodeMsg *pMsg;
|
||||
} SSdbOper;
|
||||
} SSWriteMsg;
|
||||
|
||||
typedef struct {
|
||||
char *tableName;
|
||||
|
@ -67,13 +67,13 @@ typedef struct {
|
|||
int32_t refCountPos;
|
||||
ESdbTable tableId;
|
||||
ESdbKey keyType;
|
||||
int32_t (*insertFp)(SSdbOper *pOper);
|
||||
int32_t (*deleteFp)(SSdbOper *pOper);
|
||||
int32_t (*updateFp)(SSdbOper *pOper);
|
||||
int32_t (*encodeFp)(SSdbOper *pOper);
|
||||
int32_t (*decodeFp)(SSdbOper *pDesc);
|
||||
int32_t (*destroyFp)(SSdbOper *pDesc);
|
||||
int32_t (*restoredFp)();
|
||||
int32_t (*fpInsert)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDelete)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpUpdate)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpEncode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDecode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestroy)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestored)();
|
||||
} SSdbTableDesc;
|
||||
|
||||
int32_t sdbInit();
|
||||
|
@ -84,10 +84,10 @@ bool sdbIsMaster();
|
|||
bool sdbIsServing();
|
||||
void sdbUpdateMnodeRoles();
|
||||
|
||||
int32_t sdbInsertRow(SSdbOper *pOper);
|
||||
int32_t sdbDeleteRow(SSdbOper *pOper);
|
||||
int32_t sdbUpdateRow(SSdbOper *pOper);
|
||||
int32_t sdbInsertRowImp(SSdbOper *pOper);
|
||||
int32_t sdbInsertRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbDeleteRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbUpdateRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbInsertRowImp(SSWriteMsg *pWrite);
|
||||
|
||||
void *sdbGetRow(void *handle, void *key);
|
||||
void *sdbFetchRow(void *handle, void *pIter, void **ppRow);
|
||||
|
|
|
@ -31,14 +31,14 @@ void * tsAcctSdb = NULL;
|
|||
static int32_t tsAcctUpdateSize;
|
||||
static int32_t mnodeCreateRootAcct();
|
||||
|
||||
static int32_t mnodeAcctActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionDestroy(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
pthread_mutex_destroy(&pAcct->mutex);
|
||||
tfree(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionInsert(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo));
|
||||
pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS;
|
||||
|
@ -46,14 +46,14 @@ static int32_t mnodeAcctActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionDelete(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
mnodeDropAllUsers(pAcct);
|
||||
mnodeDropAllDbs(pAcct);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionUpdate(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
SAcctObj *pSaved = mnodeGetAcct(pAcct->user);
|
||||
if (pAcct != pSaved) {
|
||||
|
@ -64,14 +64,14 @@ static int32_t mnodeAcctActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionEncode(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
memcpy(pOper->rowData, pAcct, tsAcctUpdateSize);
|
||||
pOper->rowSize = tsAcctUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeAcctActionDecode(SSWriteMsg *pOper) {
|
||||
SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj));
|
||||
if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -106,13 +106,13 @@ int32_t mnodeInitAccts() {
|
|||
.maxRowSize = tsAcctUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mnodeAcctActionInsert,
|
||||
.deleteFp = mnodeAcctActionDelete,
|
||||
.updateFp = mnodeAcctActionUpdate,
|
||||
.encodeFp = mnodeAcctActionEncode,
|
||||
.decodeFp = mnodeAcctActionDecode,
|
||||
.destroyFp = mnodeAcctActionDestroy,
|
||||
.restoredFp = mnodeAcctActionRestored
|
||||
.fpInsert = mnodeAcctActionInsert,
|
||||
.fpDelete = mnodeAcctActionDelete,
|
||||
.fpUpdate = mnodeAcctActionUpdate,
|
||||
.fpEncode = mnodeAcctActionEncode,
|
||||
.fpDecode = mnodeAcctActionDecode,
|
||||
.fpDestroy = mnodeAcctActionDestroy,
|
||||
.fpDestored = mnodeAcctActionRestored
|
||||
};
|
||||
|
||||
tsAcctSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -226,7 +226,7 @@ static int32_t mnodeCreateRootAcct() {
|
|||
pAcct->acctId = sdbGetId(tsAcctSdb);
|
||||
pAcct->createdTime = taosGetTimestampMs();
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsAcctSdb,
|
||||
.pObj = pAcct,
|
||||
|
|
|
@ -32,31 +32,31 @@ static int32_t mnodeCreateCluster();
|
|||
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
static int32_t mnodeClusterActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionDestroy(SSWriteMsg *pOper) {
|
||||
tfree(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionInsert(SSWriteMsg *pOper) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionDelete(SSWriteMsg *pOper) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionUpdate(SSWriteMsg *pOper) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionEncode(SSWriteMsg *pOper) {
|
||||
SClusterObj *pCluster = pOper->pObj;
|
||||
memcpy(pOper->rowData, pCluster, tsClusterUpdateSize);
|
||||
pOper->rowSize = tsClusterUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeClusterActionDecode(SSWriteMsg *pOper) {
|
||||
SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj));
|
||||
if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -91,13 +91,13 @@ int32_t mnodeInitCluster() {
|
|||
.maxRowSize = tsClusterUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mnodeClusterActionInsert,
|
||||
.deleteFp = mnodeClusterActionDelete,
|
||||
.updateFp = mnodeClusterActionUpdate,
|
||||
.encodeFp = mnodeClusterActionEncode,
|
||||
.decodeFp = mnodeClusterActionDecode,
|
||||
.destroyFp = mnodeClusterActionDestroy,
|
||||
.restoredFp = mnodeClusterActionRestored
|
||||
.fpInsert = mnodeClusterActionInsert,
|
||||
.fpDelete = mnodeClusterActionDelete,
|
||||
.fpUpdate = mnodeClusterActionUpdate,
|
||||
.fpEncode = mnodeClusterActionEncode,
|
||||
.fpDecode = mnodeClusterActionDecode,
|
||||
.fpDestroy = mnodeClusterActionDestroy,
|
||||
.fpDestored = mnodeClusterActionRestored
|
||||
};
|
||||
|
||||
tsClusterSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -145,7 +145,7 @@ static int32_t mnodeCreateCluster() {
|
|||
mDebug("uid is %s", pCluster->uid);
|
||||
}
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsClusterSdb,
|
||||
.pObj = pCluster,
|
||||
|
|
|
@ -56,7 +56,7 @@ static void mnodeDestroyDb(SDbObj *pDb) {
|
|||
tfree(pDb);
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionDestroy(SSWriteMsg *pOper) {
|
||||
mnodeDestroyDb(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ int64_t mnodeGetDbNum() {
|
|||
return sdbGetNumOfRows(tsDbSdb);
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionInsert(SSWriteMsg *pOper) {
|
||||
SDbObj *pDb = pOper->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
|
||||
|
||||
|
@ -91,7 +91,7 @@ static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionDelete(SSWriteMsg *pOper) {
|
||||
SDbObj *pDb = pOper->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
|
||||
|
||||
|
@ -107,7 +107,7 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionUpdate(SSWriteMsg *pOper) {
|
||||
SDbObj *pNew = pOper->pObj;
|
||||
SDbObj *pDb = mnodeGetDb(pNew->name);
|
||||
if (pDb != NULL && pNew != pDb) {
|
||||
|
@ -120,14 +120,14 @@ static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionEncode(SSWriteMsg *pOper) {
|
||||
SDbObj *pDb = pOper->pObj;
|
||||
memcpy(pOper->rowData, pDb, tsDbUpdateSize);
|
||||
pOper->rowSize = tsDbUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeDbActionDecode(SSWriteMsg *pOper) {
|
||||
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
|
||||
if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -151,13 +151,13 @@ int32_t mnodeInitDbs() {
|
|||
.maxRowSize = tsDbUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mnodeDbActionInsert,
|
||||
.deleteFp = mnodeDbActionDelete,
|
||||
.updateFp = mnodeDbActionUpdate,
|
||||
.encodeFp = mnodeDbActionEncode,
|
||||
.decodeFp = mnodeDbActionDecode,
|
||||
.destroyFp = mnodeDbActionDestroy,
|
||||
.restoredFp = mnodeDbActionRestored
|
||||
.fpInsert = mnodeDbActionInsert,
|
||||
.fpDelete = mnodeDbActionDelete,
|
||||
.fpUpdate = mnodeDbActionUpdate,
|
||||
.fpEncode = mnodeDbActionEncode,
|
||||
.fpDecode = mnodeDbActionDecode,
|
||||
.fpDestroy = mnodeDbActionDestroy,
|
||||
.fpDestored = mnodeDbActionRestored
|
||||
};
|
||||
|
||||
tsDbSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -412,7 +412,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
|
|||
pMsg->pDb = pDb;
|
||||
mnodeIncDbRef(pDb);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
|
@ -807,7 +807,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
|
|||
if (pDb->status) return TSDB_CODE_SUCCESS;
|
||||
|
||||
pDb->status = true;
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb
|
||||
|
@ -1019,7 +1019,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
|
|||
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
||||
pDb->cfg = newCfg;
|
||||
pDb->cfgVersion++;
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
|
@ -1071,7 +1071,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
|||
SDbObj *pDb = pMsg->pDb;
|
||||
mInfo("db:%s, drop db from sdb", pDb->name);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
|
@ -1134,7 +1134,7 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
|
|||
|
||||
if (pDb->pAcct == pAcct) {
|
||||
mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user);
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb
|
||||
|
|
|
@ -87,12 +87,12 @@ static char* offlineReason[] = {
|
|||
"unknown",
|
||||
};
|
||||
|
||||
static int32_t mnodeDnodeActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pOper) {
|
||||
tfree(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionInsert(SSWriteMsg *pOper) {
|
||||
SDnodeObj *pDnode = pOper->pObj;
|
||||
if (pDnode->status != TAOS_DN_STATUS_DROPPING) {
|
||||
pDnode->status = TAOS_DN_STATUS_OFFLINE;
|
||||
|
@ -107,7 +107,7 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionDelete(SSWriteMsg *pOper) {
|
||||
SDnodeObj *pDnode = pOper->pObj;
|
||||
|
||||
#ifndef _SYNC
|
||||
|
@ -121,7 +121,7 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pOper) {
|
||||
SDnodeObj *pNew = pOper->pObj;
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
|
||||
if (pDnode != NULL && pNew != pDnode) {
|
||||
|
@ -134,14 +134,14 @@ static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionEncode(SSWriteMsg *pOper) {
|
||||
SDnodeObj *pDnode = pOper->pObj;
|
||||
memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize);
|
||||
pOper->rowSize = tsDnodeUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeDnodeActionDecode(SSWriteMsg *pOper) {
|
||||
SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj));
|
||||
if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -178,13 +178,13 @@ int32_t mnodeInitDnodes() {
|
|||
.maxRowSize = tsDnodeUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_AUTO,
|
||||
.insertFp = mnodeDnodeActionInsert,
|
||||
.deleteFp = mnodeDnodeActionDelete,
|
||||
.updateFp = mnodeDnodeActionUpdate,
|
||||
.encodeFp = mnodeDnodeActionEncode,
|
||||
.decodeFp = mnodeDnodeActionDecode,
|
||||
.destroyFp = mnodeDnodeActionDestroy,
|
||||
.restoredFp = mnodeDnodeActionRestored
|
||||
.fpInsert = mnodeDnodeActionInsert,
|
||||
.fpDelete = mnodeDnodeActionDelete,
|
||||
.fpUpdate = mnodeDnodeActionUpdate,
|
||||
.fpEncode = mnodeDnodeActionEncode,
|
||||
.fpDecode = mnodeDnodeActionDecode,
|
||||
.fpDestroy = mnodeDnodeActionDestroy,
|
||||
.fpDestored = mnodeDnodeActionRestored
|
||||
};
|
||||
|
||||
tsDnodeSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -296,7 +296,7 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) {
|
|||
}
|
||||
|
||||
void mnodeUpdateDnode(SDnodeObj *pDnode) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDnodeSdb,
|
||||
.pObj = pDnode
|
||||
|
@ -644,7 +644,7 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
|
|||
tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN);
|
||||
taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDnodeSdb,
|
||||
.pObj = pDnode,
|
||||
|
@ -665,7 +665,7 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDnodeSdb,
|
||||
.pObj = pDnode,
|
||||
|
|
|
@ -58,12 +58,12 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
|||
#define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
|
||||
#endif
|
||||
|
||||
static int32_t mnodeMnodeActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pOper) {
|
||||
tfree(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionInsert(SSWriteMsg *pOper) {
|
||||
SMnodeObj *pMnode = pOper->pObj;
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
|
||||
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
|
@ -76,7 +76,7 @@ static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionDelete(SSWriteMsg *pOper) {
|
||||
SMnodeObj *pMnode = pOper->pObj;
|
||||
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
|
||||
|
@ -88,7 +88,7 @@ static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pOper) {
|
||||
SMnodeObj *pMnode = pOper->pObj;
|
||||
SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId);
|
||||
if (pMnode != pSaved) {
|
||||
|
@ -99,14 +99,14 @@ static int32_t mnodeMnodeActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionEncode(SSWriteMsg *pOper) {
|
||||
SMnodeObj *pMnode = pOper->pObj;
|
||||
memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize);
|
||||
pOper->rowSize = tsMnodeUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeMnodeActionDecode(SSWriteMsg *pOper) {
|
||||
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
|
||||
if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -144,13 +144,13 @@ int32_t mnodeInitMnodes() {
|
|||
.maxRowSize = tsMnodeUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_INT,
|
||||
.insertFp = mnodeMnodeActionInsert,
|
||||
.deleteFp = mnodeMnodeActionDelete,
|
||||
.updateFp = mnodeMnodeActionUpdate,
|
||||
.encodeFp = mnodeMnodeActionEncode,
|
||||
.decodeFp = mnodeMnodeActionDecode,
|
||||
.destroyFp = mnodeMnodeActionDestroy,
|
||||
.restoredFp = mnodeMnodeActionRestored
|
||||
.fpInsert = mnodeMnodeActionInsert,
|
||||
.fpDelete = mnodeMnodeActionDelete,
|
||||
.fpUpdate = mnodeMnodeActionUpdate,
|
||||
.fpEncode = mnodeMnodeActionEncode,
|
||||
.fpDecode = mnodeMnodeActionDecode,
|
||||
.fpDestroy = mnodeMnodeActionDestroy,
|
||||
.fpDestored = mnodeMnodeActionRestored
|
||||
};
|
||||
|
||||
tsMnodeSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -329,7 +329,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
pMnode->mnodeId = dnodeId;
|
||||
pMnode->createdTime = taosGetTimestampMs();
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsMnodeSdb,
|
||||
.pObj = pMnode,
|
||||
|
@ -356,7 +356,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
void mnodeDropMnodeLocal(int32_t dnodeId) {
|
||||
SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
|
||||
if (pMnode != NULL) {
|
||||
SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
|
||||
SSWriteMsg oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
|
||||
sdbDeleteRow(&oper);
|
||||
mnodeDecMnodeRef(pMnode);
|
||||
}
|
||||
|
@ -371,7 +371,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
|
|||
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsMnodeSdb,
|
||||
.pObj = pMnode
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef enum {
|
|||
SDB_STATUS_CLOSING
|
||||
} ESdbStatus;
|
||||
|
||||
typedef struct _SSdbTable {
|
||||
typedef struct SSdbTable {
|
||||
char tableName[SDB_TABLE_LEN];
|
||||
ESdbTable tableId;
|
||||
ESdbKey keyType;
|
||||
|
@ -58,13 +58,13 @@ typedef struct _SSdbTable {
|
|||
int32_t autoIndex;
|
||||
int64_t numOfRows;
|
||||
void * iHandle;
|
||||
int32_t (*insertFp)(SSdbOper *pDesc);
|
||||
int32_t (*deleteFp)(SSdbOper *pOper);
|
||||
int32_t (*updateFp)(SSdbOper *pOper);
|
||||
int32_t (*decodeFp)(SSdbOper *pOper);
|
||||
int32_t (*encodeFp)(SSdbOper *pOper);
|
||||
int32_t (*destroyFp)(SSdbOper *pOper);
|
||||
int32_t (*restoredFp)();
|
||||
int32_t (*fpInsert)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDelete)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpUpdate)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDecode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpEncode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestroy)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestored)();
|
||||
pthread_mutex_t mutex;
|
||||
} SSdbTable;
|
||||
|
||||
|
@ -83,33 +83,33 @@ typedef struct {
|
|||
typedef struct {
|
||||
pthread_t thread;
|
||||
int32_t workerId;
|
||||
} SSdbWriteWorker;
|
||||
} SSWriteWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SSdbWriteWorker *writeWorker;
|
||||
} SSdbWriteWorkerPool;
|
||||
SSWriteWorker *worker;
|
||||
} SSWriteWorkerPool;
|
||||
|
||||
extern void * tsMnodeTmr;
|
||||
static void * tsUpdateSyncTmr;
|
||||
static void * tsSdbTmr;
|
||||
static SSdbObject tsSdbObj = {0};
|
||||
static taos_qset tsSdbWriteQset;
|
||||
static taos_qall tsSdbWriteQall;
|
||||
static taos_queue tsSdbWriteQueue;
|
||||
static SSdbWriteWorkerPool tsSdbPool;
|
||||
static taos_qset tsSdbWQset;
|
||||
static taos_qall tsSdbWQall;
|
||||
static taos_queue tsSdbWQueue;
|
||||
static SSWriteWorkerPool tsSdbPool;
|
||||
|
||||
static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg);
|
||||
static int32_t sdbWrite(void *wparam, void *data, int32_t type, void *pMsg);
|
||||
static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg);
|
||||
static void * sdbWorkerFp(void *param);
|
||||
static int32_t sdbInitWriteWorker();
|
||||
static void sdbCleanupWriteWorker();
|
||||
static int32_t sdbAllocWriteQueue();
|
||||
static void sdbFreeWritequeue();
|
||||
static int32_t sdbUpdateRowImp(SSdbOper *pOper);
|
||||
static int32_t sdbDeleteRowImp(SSdbOper *pOper);
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper);
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper);
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper);
|
||||
static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite);
|
||||
static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite);
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
|
||||
int32_t sdbGetId(void *handle) {
|
||||
return ((SSdbTable *)handle)->autoIndex;
|
||||
|
@ -199,8 +199,8 @@ static void sdbRestoreTables() {
|
|||
for (int32_t tableId = 0; tableId < SDB_TABLE_MAX; ++tableId) {
|
||||
SSdbTable *pTable = sdbGetTableFromId(tableId);
|
||||
if (pTable == NULL) continue;
|
||||
if (pTable->restoredFp) {
|
||||
(*pTable->restoredFp)();
|
||||
if (pTable->fpDestored) {
|
||||
(*pTable->fpDestored)();
|
||||
}
|
||||
|
||||
totalRows += pTable->numOfRows;
|
||||
|
@ -255,11 +255,11 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
|
|||
FORCE_INLINE
|
||||
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
||||
assert(param);
|
||||
SSdbOper * pOper = param;
|
||||
SMnodeMsg *pMsg = pOper->pMsg;
|
||||
if (code <= 0) pOper->retCode = code;
|
||||
SSWriteMsg * pWrite = param;
|
||||
SMnodeMsg *pMsg = pWrite->pMsg;
|
||||
if (code <= 0) pWrite->retCode = code;
|
||||
|
||||
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
|
||||
int32_t processedCount = atomic_add_fetch_32(&pWrite->processedCount, 1);
|
||||
if (processedCount <= 1) {
|
||||
if (pMsg != NULL) {
|
||||
sdbDebug("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, processedCount, code);
|
||||
|
@ -272,40 +272,40 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|||
}
|
||||
|
||||
// failed to forward, need revert insert
|
||||
if (pOper->retCode != TSDB_CODE_SUCCESS) {
|
||||
SWalHead *pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
if (pWrite->retCode != TSDB_CODE_SUCCESS) {
|
||||
SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pOper->pObj,
|
||||
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version, action, tstrerror(pOper->retCode));
|
||||
sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pObj,
|
||||
sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode));
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
// It's better to create a table in two stages, create it first and then set it success
|
||||
//sdbDeleteHash(pOper->table, pOper);
|
||||
SSdbOper oper = {
|
||||
//sdbDeleteHash(pWrite->table, pWrite);
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = pOper->table,
|
||||
.pObj = pOper->pObj
|
||||
.table = pWrite->table,
|
||||
.pObj = pWrite->pObj
|
||||
};
|
||||
sdbDeleteRow(&oper);
|
||||
}
|
||||
}
|
||||
|
||||
if (pOper->writeCb != NULL) {
|
||||
pOper->retCode = (*pOper->writeCb)(pMsg, pOper->retCode);
|
||||
if (pWrite->writeCb != NULL) {
|
||||
pWrite->retCode = (*pWrite->writeCb)(pMsg, pWrite->retCode);
|
||||
}
|
||||
dnodeSendRpcMWriteRsp(pMsg, pOper->retCode);
|
||||
dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode);
|
||||
|
||||
// if ahandle, means this func is called by sdb write
|
||||
if (ahandle == NULL) {
|
||||
sdbDecRef(pOper->table, pOper->pObj);
|
||||
sdbDecRef(pWrite->table, pWrite->pObj);
|
||||
}
|
||||
|
||||
taosFreeQitem(pOper);
|
||||
taosFreeQitem(pWrite);
|
||||
}
|
||||
|
||||
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
|
||||
|
||||
void sdbUpdateAsync() {
|
||||
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
|
||||
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsSdbTmr);
|
||||
}
|
||||
|
||||
void sdbUpdateSync(void *pMnodes) {
|
||||
|
@ -462,8 +462,8 @@ void sdbDecRef(void *handle, void *pObj) {
|
|||
int32_t *updateEnd = pObj + pTable->refCountPos - 4;
|
||||
if (refCount <= 0 && *updateEnd) {
|
||||
sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount);
|
||||
SSdbOper oper = {.pObj = pObj};
|
||||
(*pTable->destroyFp)(&oper);
|
||||
SSWriteMsg oper = {.pObj = pObj};
|
||||
(*pTable->fpDestroy)(&oper);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -500,8 +500,8 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
|
|||
return sdbGetRow(pTable, sdbGetObjKey(pTable, key));
|
||||
}
|
||||
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
void * key = sdbGetObjKey(pTable, pWrite->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
|
@ -509,43 +509,43 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(int64_t));
|
||||
taosHashPut(pTable->iHandle, key, keySize, &pWrite->pObj, sizeof(int64_t));
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbIncRef(pTable, pOper->pObj);
|
||||
sdbIncRef(pTable, pWrite->pObj);
|
||||
atomic_add_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj));
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pWrite->pObj));
|
||||
} else {
|
||||
atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
|
||||
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, pOper->pMsg);
|
||||
sdbGetObjStr(pTable, pWrite->pObj), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg);
|
||||
|
||||
int32_t code = (*pTable->insertFp)(pOper);
|
||||
int32_t code = (*pTable->fpInsert)(pWrite);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj));
|
||||
sdbDeleteHash(pTable, pOper);
|
||||
sdbGetObjStr(pTable, pWrite->pObj));
|
||||
sdbDeleteHash(pTable, pWrite);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
|
||||
int32_t *updateEnd = pOper->pObj + pTable->refCountPos - 4;
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
int32_t *updateEnd = pWrite->pObj + pTable->refCountPos - 4;
|
||||
bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0;
|
||||
if (!set) {
|
||||
sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj));
|
||||
sdbGetObjStr(pTable, pWrite->pObj));
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
(*pTable->deleteFp)(pOper);
|
||||
(*pTable->fpDelete)(pWrite);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
void * key = sdbGetObjKey(pTable, pWrite->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
|
@ -558,23 +558,23 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
atomic_sub_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg);
|
||||
sdbGetObjStr(pTable, pWrite->pObj), pTable->numOfRows, pWrite->pMsg);
|
||||
|
||||
sdbDecRef(pTable, pOper->pObj);
|
||||
sdbDecRef(pTable, pWrite->pObj);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg);
|
||||
sdbGetObjStr(pTable, pWrite->pObj), pTable->numOfRows, pWrite->pMsg);
|
||||
|
||||
(*pTable->updateFp)(pOper);
|
||||
(*pTable->fpUpdate)(pWrite);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
|
||||
SSdbOper *pOper = param;
|
||||
SSWriteMsg *pWrite = param;
|
||||
SWalHead *pHead = data;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
@ -614,21 +614,21 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
|
|||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||
|
||||
// from app, oper is created
|
||||
if (pOper != NULL) {
|
||||
if (pWrite != NULL) {
|
||||
// forward to peers
|
||||
pOper->processedCount = 0;
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
||||
if (syncCode <= 0) pOper->processedCount = 1;
|
||||
pWrite->processedCount = 0;
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pWrite, TAOS_QTYPE_RPC);
|
||||
if (syncCode <= 0) pWrite->processedCount = 1;
|
||||
|
||||
if (syncCode < 0) {
|
||||
sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
|
||||
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
|
||||
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
} else if (syncCode > 0) {
|
||||
sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
|
||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
} else {
|
||||
sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
|
||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
}
|
||||
return syncCode;
|
||||
}
|
||||
|
@ -637,12 +637,12 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
|
|||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||
|
||||
// even it is WAL/FWD, it shall be called to update version in sync
|
||||
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
|
||||
syncForwardToPeer(tsSdbObj.sync, pHead, pWrite, TAOS_QTYPE_RPC);
|
||||
|
||||
// from wal or forward msg, oper not created, should add into hash
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
||||
code = (*pTable->decodeFp)(&oper);
|
||||
SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
||||
code = (*pTable->fpDecode)(&oper);
|
||||
return sdbInsertHash(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_DELETE) {
|
||||
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
|
||||
|
@ -651,7 +651,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
|
|||
sdbGetKeyStr(pTable, pHead->cont));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SSdbOper oper = {.table = pTable, .pObj = pRow};
|
||||
SSWriteMsg oper = {.table = pTable, .pObj = pRow};
|
||||
return sdbDeleteHash(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_UPDATE) {
|
||||
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
|
||||
|
@ -660,77 +660,77 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
|
|||
sdbGetKeyStr(pTable, pHead->cont));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
||||
code = (*pTable->decodeFp)(&oper);
|
||||
SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
|
||||
code = (*pTable->fpDecode)(&oper);
|
||||
return sdbUpdateHash(pTable, &oper);
|
||||
} else {
|
||||
return TSDB_CODE_MND_INVALID_MSG_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbInsertRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbInsertRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
if (sdbGetRowFromObj(pTable, pOper->pObj)) {
|
||||
if (sdbGetRowFromObj(pTable, pWrite->pObj)) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert key:%s, already exist", pTable->tableName,
|
||||
sdbGetObjStr(pTable, pOper->pObj));
|
||||
sdbDecRef(pTable, pOper->pObj);
|
||||
sdbGetObjStr(pTable, pWrite->pObj));
|
||||
sdbDecRef(pTable, pWrite->pObj);
|
||||
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
|
||||
}
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
*((uint32_t *)pWrite->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
|
||||
// let vgId increase from 2
|
||||
if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
*((uint32_t *)pWrite->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = sdbInsertHash(pTable, pOper);
|
||||
int32_t code = sdbInsertHash(pTable, pWrite);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert into hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just insert data into memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOper->reqFp) {
|
||||
return (*pOper->reqFp)(pOper->pMsg);
|
||||
if (pWrite->reqFp) {
|
||||
return (*pWrite->reqFp)(pWrite->pMsg);
|
||||
} else {
|
||||
return sdbInsertRowImp(pOper);
|
||||
return sdbInsertRowImp(pWrite);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbInsertRowImp(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSWriteMsg *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->len = pOper->rowSize;
|
||||
pHead->len = pWrite->rowSize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
pWrite->rowData = pHead->cont;
|
||||
(*pTable->fpEncode)(pWrite);
|
||||
pHead->len = pWrite->rowSize;
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj));
|
||||
pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj));
|
||||
}
|
||||
|
||||
sdbIncRef(pNewOper->table, pNewOper->pObj);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
@ -743,117 +743,117 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
|
|||
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
|
||||
}
|
||||
|
||||
int32_t sdbDeleteRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj);
|
||||
if (pRow == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, record is not there, delete failed", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
sdbIncRef(pTable, pOper->pObj);
|
||||
sdbIncRef(pTable, pWrite->pObj);
|
||||
|
||||
int32_t code = sdbDeleteHash(pTable, pOper);
|
||||
int32_t code = sdbDeleteHash(pTable, pWrite);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->tableName);
|
||||
sdbDecRef(pTable, pOper->pObj);
|
||||
sdbDecRef(pTable, pWrite->pObj);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just delete data from memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
sdbDecRef(pTable, pOper->pObj);
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
sdbDecRef(pTable, pWrite->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOper->reqFp) {
|
||||
return (*pOper->reqFp)(pOper->pMsg);
|
||||
if (pWrite->reqFp) {
|
||||
return (*pWrite->reqFp)(pWrite->pMsg);
|
||||
} else {
|
||||
return sdbDeleteRowImp(pOper);
|
||||
return sdbDeleteRowImp(pWrite);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbDeleteRowImp(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSWriteMsg *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
pWrite->rowData = pHead->cont;
|
||||
(*pTable->fpEncode)(pWrite);
|
||||
pHead->len = pWrite->rowSize;
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj));
|
||||
pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj));
|
||||
}
|
||||
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
int32_t sdbUpdateRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj);
|
||||
if (pRow == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, record is not there, update failed", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
int32_t code = sdbUpdateHash(pTable, pOper);
|
||||
int32_t code = sdbUpdateHash(pTable, pWrite);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to update hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just update data in memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOper->reqFp) {
|
||||
return (*pOper->reqFp)(pOper->pMsg);
|
||||
if (pWrite->reqFp) {
|
||||
return (*pWrite->reqFp)(pWrite->pMsg);
|
||||
} else {
|
||||
return sdbUpdateRowImp(pOper);
|
||||
return sdbUpdateRowImp(pWrite);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbUpdateRowImp(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = (SSdbTable *)pWrite->table;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSWriteMsg *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
pWrite->rowData = pHead->cont;
|
||||
(*pTable->fpEncode)(pWrite);
|
||||
pHead->len = pWrite->rowSize;
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj));
|
||||
pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj));
|
||||
}
|
||||
|
||||
sdbIncRef(pNewOper->table, pNewOper->pObj);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
@ -903,13 +903,13 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
|
|||
pTable->hashSessions = pDesc->hashSessions;
|
||||
pTable->maxRowSize = pDesc->maxRowSize;
|
||||
pTable->refCountPos = pDesc->refCountPos;
|
||||
pTable->insertFp = pDesc->insertFp;
|
||||
pTable->deleteFp = pDesc->deleteFp;
|
||||
pTable->updateFp = pDesc->updateFp;
|
||||
pTable->encodeFp = pDesc->encodeFp;
|
||||
pTable->decodeFp = pDesc->decodeFp;
|
||||
pTable->destroyFp = pDesc->destroyFp;
|
||||
pTable->restoredFp = pDesc->restoredFp;
|
||||
pTable->fpInsert = pDesc->fpInsert;
|
||||
pTable->fpDelete = pDesc->fpDelete;
|
||||
pTable->fpUpdate = pDesc->fpUpdate;
|
||||
pTable->fpEncode = pDesc->fpEncode;
|
||||
pTable->fpDecode = pDesc->fpDecode;
|
||||
pTable->fpDestroy = pDesc->fpDestroy;
|
||||
pTable->fpDestored = pDesc->fpDestored;
|
||||
|
||||
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
|
@ -934,12 +934,12 @@ void sdbCloseTable(void *handle) {
|
|||
void **ppRow = taosHashIterGet(pIter);
|
||||
if (ppRow == NULL) continue;
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.pObj = *ppRow,
|
||||
.table = pTable,
|
||||
};
|
||||
|
||||
(*pTable->destroyFp)(&oper);
|
||||
(*pTable->fpDestroy)(&oper);
|
||||
}
|
||||
|
||||
taosHashDestroyIter(pIter);
|
||||
|
@ -952,11 +952,11 @@ void sdbCloseTable(void *handle) {
|
|||
|
||||
int32_t sdbInitWriteWorker() {
|
||||
tsSdbPool.num = 1;
|
||||
tsSdbPool.writeWorker = (SSdbWriteWorker *)calloc(sizeof(SSdbWriteWorker), tsSdbPool.num);
|
||||
tsSdbPool.worker = (SSWriteWorker *)calloc(sizeof(SSWriteWorker), tsSdbPool.num);
|
||||
|
||||
if (tsSdbPool.writeWorker == NULL) return -1;
|
||||
if (tsSdbPool.worker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
SSWriteWorker *pWorker = tsSdbPool.worker + i;
|
||||
pWorker->workerId = i;
|
||||
}
|
||||
|
||||
|
@ -968,45 +968,45 @@ int32_t sdbInitWriteWorker() {
|
|||
|
||||
void sdbCleanupWriteWorker() {
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
SSWriteWorker *pWorker = tsSdbPool.worker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsSdbWriteQset);
|
||||
taosQsetThreadResume(tsSdbWQset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
SSWriteWorker *pWorker = tsSdbPool.worker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
sdbFreeWritequeue();
|
||||
tfree(tsSdbPool.writeWorker);
|
||||
tfree(tsSdbPool.worker);
|
||||
|
||||
mInfo("vgId:1, sdb write is closed");
|
||||
}
|
||||
|
||||
int32_t sdbAllocWriteQueue() {
|
||||
tsSdbWriteQueue = taosOpenQueue();
|
||||
if (tsSdbWriteQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
tsSdbWQueue = taosOpenQueue();
|
||||
if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
tsSdbWriteQset = taosOpenQset();
|
||||
if (tsSdbWriteQset == NULL) {
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
tsSdbWQset = taosOpenQset();
|
||||
if (tsSdbWQset == NULL) {
|
||||
taosCloseQueue(tsSdbWQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
taosAddIntoQset(tsSdbWriteQset, tsSdbWriteQueue, NULL);
|
||||
taosAddIntoQset(tsSdbWQset, tsSdbWQueue, NULL);
|
||||
|
||||
tsSdbWriteQall = taosAllocateQall();
|
||||
if (tsSdbWriteQall == NULL) {
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
tsSdbWQall = taosAllocateQall();
|
||||
if (tsSdbWQall == NULL) {
|
||||
taosCloseQset(tsSdbWQset);
|
||||
taosCloseQueue(tsSdbWQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
SSWriteWorker *pWorker = tsSdbPool.worker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
|
@ -1015,9 +1015,9 @@ int32_t sdbAllocWriteQueue() {
|
|||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, sdbWorkerFp, pWorker) != 0) {
|
||||
mError("failed to create thread to process sdb write queue, reason:%s", strerror(errno));
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
taosFreeQall(tsSdbWQall);
|
||||
taosCloseQset(tsSdbWQset);
|
||||
taosCloseQueue(tsSdbWQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -1025,17 +1025,17 @@ int32_t sdbAllocWriteQueue() {
|
|||
mDebug("sdb write worker:%d is launched, total:%d", pWorker->workerId, tsSdbPool.num);
|
||||
}
|
||||
|
||||
mDebug("sdb write queue:%p is allocated", tsSdbWriteQueue);
|
||||
mDebug("sdb write queue:%p is allocated", tsSdbWQueue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void sdbFreeWritequeue() {
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
tsSdbWriteQall = NULL;
|
||||
tsSdbWriteQset = NULL;
|
||||
tsSdbWriteQueue = NULL;
|
||||
taosCloseQueue(tsSdbWQueue);
|
||||
taosFreeQall(tsSdbWQall);
|
||||
taosCloseQset(tsSdbWQset);
|
||||
tsSdbWQall = NULL;
|
||||
tsSdbWQset = NULL;
|
||||
tsSdbWQueue = NULL;
|
||||
}
|
||||
|
||||
int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
|
||||
|
@ -1044,45 +1044,45 @@ int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
|
|||
SWalHead *pWal = taosAllocateQitem(size);
|
||||
memcpy(pWal, pHead, size);
|
||||
|
||||
taosWriteQitem(tsSdbWriteQueue, qtype, pWal);
|
||||
taosWriteQitem(tsSdbWQueue, qtype, pWal);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *sdbWorkerFp(void *param) {
|
||||
SWalHead *pHead;
|
||||
SSdbOper *pOper;
|
||||
SSWriteMsg *pWrite;
|
||||
int32_t type;
|
||||
int32_t numOfMsgs;
|
||||
void * item;
|
||||
void * unUsed;
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
|
||||
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
|
||||
if (numOfMsgs == 0) {
|
||||
sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset);
|
||||
sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWQset);
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
taosGetQitem(tsSdbWQall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
pOper->processedCount = 1;
|
||||
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
if (pOper->pMsg != NULL) {
|
||||
pWrite = (SSWriteMsg *)item;
|
||||
pWrite->processedCount = 1;
|
||||
pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
||||
if (pWrite->pMsg != NULL) {
|
||||
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue",
|
||||
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
|
||||
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
|
||||
pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, ((SSdbTable *)pWrite->table)->tableName, pWrite->pObj,
|
||||
sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version);
|
||||
}
|
||||
} else {
|
||||
pHead = (SWalHead *)item;
|
||||
pOper = NULL;
|
||||
pWrite = NULL;
|
||||
}
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, type, NULL);
|
||||
int32_t code = sdbWrite(pWrite, pHead, type, NULL);
|
||||
if (code > 0) code = 0;
|
||||
if (pOper) {
|
||||
pOper->retCode = code;
|
||||
if (pWrite) {
|
||||
pWrite->retCode = code;
|
||||
} else {
|
||||
pHead->len = code; // hackway
|
||||
}
|
||||
|
@ -1091,13 +1091,13 @@ static void *sdbWorkerFp(void *param) {
|
|||
walFsync(tsSdbObj.wal, true);
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(tsSdbWriteQall);
|
||||
taosResetQitems(tsSdbWQall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
taosGetQitem(tsSdbWQall, &type, &item);
|
||||
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
sdbConfirmForward(NULL, pOper, pOper->retCode);
|
||||
pWrite = (SSWriteMsg *)item;
|
||||
sdbConfirmForward(NULL, pWrite, pWrite->retCode);
|
||||
} else if (type == TAOS_QTYPE_FWD) {
|
||||
pHead = (SWalHead *)item;
|
||||
syncConfirmForward(tsSdbObj.sync, pHead->version, pHead->len);
|
||||
|
|
|
@ -99,12 +99,12 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
|
|||
tfree(pTable);
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pOper) {
|
||||
mnodeDestroyChildTable(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionInsert(SSWriteMsg *pOper) {
|
||||
SCTableObj *pTable = pOper->pObj;
|
||||
|
||||
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||
|
@ -153,7 +153,7 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionDelete(SSWriteMsg *pOper) {
|
||||
SCTableObj *pTable = pOper->pObj;
|
||||
if (pTable->vgId == 0) {
|
||||
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||
|
@ -189,7 +189,7 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pOper) {
|
||||
SCTableObj *pNew = pOper->pObj;
|
||||
SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId);
|
||||
if (pTable != pNew) {
|
||||
|
@ -216,7 +216,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionEncode(SSWriteMsg *pOper) {
|
||||
SCTableObj *pTable = pOper->pObj;
|
||||
assert(pTable != NULL && pOper->rowData != NULL);
|
||||
|
||||
|
@ -246,7 +246,7 @@ static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) {
|
||||
assert(pOper->rowData != NULL);
|
||||
SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
|
||||
if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId);
|
||||
if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId);
|
||||
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
mnodeDecDbRef(pDb);
|
||||
|
@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
if (pVgroup == NULL) {
|
||||
mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
|
||||
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
if (pSuperTable == NULL) {
|
||||
mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -358,13 +358,13 @@ static int32_t mnodeInitChildTables() {
|
|||
.maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_VAR_STRING,
|
||||
.insertFp = mnodeChildTableActionInsert,
|
||||
.deleteFp = mnodeChildTableActionDelete,
|
||||
.updateFp = mnodeChildTableActionUpdate,
|
||||
.encodeFp = mnodeChildTableActionEncode,
|
||||
.decodeFp = mnodeChildTableActionDecode,
|
||||
.destroyFp = mnodeChildTableActionDestroy,
|
||||
.restoredFp = mnodeChildTableActionRestored
|
||||
.fpInsert = mnodeChildTableActionInsert,
|
||||
.fpDelete = mnodeChildTableActionDelete,
|
||||
.fpUpdate = mnodeChildTableActionUpdate,
|
||||
.fpEncode = mnodeChildTableActionEncode,
|
||||
.fpDecode = mnodeChildTableActionDecode,
|
||||
.fpDestroy = mnodeChildTableActionDestroy,
|
||||
.fpDestored = mnodeChildTableActionRestored
|
||||
};
|
||||
|
||||
tsChildTableSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -430,12 +430,12 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) {
|
|||
tfree(pStable);
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pOper) {
|
||||
mnodeDestroySuperTable(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pOper) {
|
||||
SSTableObj *pStable = pOper->pObj;
|
||||
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
|
||||
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
|
||||
|
@ -446,7 +446,7 @@ static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pOper) {
|
||||
SSTableObj *pStable = pOper->pObj;
|
||||
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
|
||||
if (pDb != NULL) {
|
||||
|
@ -458,7 +458,7 @@ static int32_t mnodeSuperTableActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pOper) {
|
||||
SSTableObj *pNew = pOper->pObj;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
|
||||
if (pTable != NULL && pTable != pNew) {
|
||||
|
@ -483,7 +483,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pOper) {
|
||||
SSTableObj *pStable = pOper->pObj;
|
||||
assert(pOper->pObj != NULL && pOper->rowData != NULL);
|
||||
|
||||
|
@ -506,7 +506,7 @@ static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pOper) {
|
||||
assert(pOper->rowData != NULL);
|
||||
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj));
|
||||
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -551,13 +551,13 @@ static int32_t mnodeInitSuperTables() {
|
|||
.maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_VAR_STRING,
|
||||
.insertFp = mnodeSuperTableActionInsert,
|
||||
.deleteFp = mnodeSuperTableActionDelete,
|
||||
.updateFp = mnodeSuperTableActionUpdate,
|
||||
.encodeFp = mnodeSuperTableActionEncode,
|
||||
.decodeFp = mnodeSuperTableActionDecode,
|
||||
.destroyFp = mnodeSuperTableActionDestroy,
|
||||
.restoredFp = mnodeSuperTableActionRestored
|
||||
.fpInsert = mnodeSuperTableActionInsert,
|
||||
.fpDelete = mnodeSuperTableActionDelete,
|
||||
.fpUpdate = mnodeSuperTableActionUpdate,
|
||||
.fpEncode = mnodeSuperTableActionEncode,
|
||||
.fpDecode = mnodeSuperTableActionDecode,
|
||||
.fpDestroy = mnodeSuperTableActionDestroy,
|
||||
.fpDestored = mnodeSuperTableActionRestored
|
||||
};
|
||||
|
||||
tsSuperTableSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
} else {
|
||||
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
}
|
||||
|
||||
|
@ -878,7 +878,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pMsg->pTable = (STableObj *)pStable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -937,7 +937,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
mnodeDropAllChildTablesInStable(pStable);
|
||||
}
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1010,7 +1010,7 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
|
|||
mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
schema[0].name);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1044,7 +1044,7 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1088,7 +1088,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
|
|||
mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
oldTagName, newTagName);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1162,7 +1162,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1207,7 +1207,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1251,7 +1251,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
|
|||
mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
oldName, newName);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
|
@ -1417,7 +1417,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
} else {
|
||||
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code));
|
||||
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
return code;
|
||||
}
|
||||
|
@ -1780,7 +1780,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
|
|||
pMsg->pTable = (STableObj *)pTable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSdbOper desc = {
|
||||
SSWriteMsg desc = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pObj = pTable,
|
||||
.table = tsChildTableSdb,
|
||||
|
@ -1901,7 +1901,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2005,7 +2005,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
|
|||
|
||||
mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2038,7 +2038,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
|||
|
||||
mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2075,7 +2075,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
|
|||
mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
oldName, newName);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2218,7 +2218,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (pTable->vgId == pVgroup->vgId) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2251,7 +2251,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2280,7 +2280,7 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (pTable->superTable == pStable) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
|
@ -2410,7 +2410,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
SSdbOper desc = {
|
||||
SSWriteMsg desc = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pObj = pTable,
|
||||
.table = tsChildTableSdb,
|
||||
|
@ -2440,7 +2440,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
|
||||
tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry);
|
||||
|
||||
SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable};
|
||||
SSWriteMsg oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable};
|
||||
sdbDeleteRow(&oper);
|
||||
|
||||
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) {
|
||||
|
|
|
@ -42,12 +42,12 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg);
|
|||
static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg);
|
||||
|
||||
static int32_t mnodeUserActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionDestroy(SSWriteMsg *pOper) {
|
||||
tfree(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionInsert(SSWriteMsg *pOper) {
|
||||
SUserObj *pUser = pOper->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
|
||||
|
||||
|
@ -62,7 +62,7 @@ static int32_t mnodeUserActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionDelete(SSWriteMsg *pOper) {
|
||||
SUserObj *pUser = pOper->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
|
||||
|
||||
|
@ -74,7 +74,7 @@ static int32_t mnodeUserActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionUpdate(SSWriteMsg *pOper) {
|
||||
SUserObj *pUser = pOper->pObj;
|
||||
SUserObj *pSaved = mnodeGetUser(pUser->user);
|
||||
if (pUser != pSaved) {
|
||||
|
@ -85,14 +85,14 @@ static int32_t mnodeUserActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionEncode(SSWriteMsg *pOper) {
|
||||
SUserObj *pUser = pOper->pObj;
|
||||
memcpy(pOper->rowData, pUser, tsUserUpdateSize);
|
||||
pOper->rowSize = tsUserUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeUserActionDecode(SSWriteMsg *pOper) {
|
||||
SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj));
|
||||
if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -157,13 +157,13 @@ int32_t mnodeInitUsers() {
|
|||
.maxRowSize = tsUserUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mnodeUserActionInsert,
|
||||
.deleteFp = mnodeUserActionDelete,
|
||||
.updateFp = mnodeUserActionUpdate,
|
||||
.encodeFp = mnodeUserActionEncode,
|
||||
.decodeFp = mnodeUserActionDecode,
|
||||
.destroyFp = mnodeUserActionDestroy,
|
||||
.restoredFp = mnodeUserActionRestored
|
||||
.fpInsert = mnodeUserActionInsert,
|
||||
.fpDelete = mnodeUserActionDelete,
|
||||
.fpUpdate = mnodeUserActionUpdate,
|
||||
.fpEncode = mnodeUserActionEncode,
|
||||
.fpDecode = mnodeUserActionDecode,
|
||||
.fpDestroy = mnodeUserActionDestroy,
|
||||
.fpDestored = mnodeUserActionRestored
|
||||
};
|
||||
|
||||
tsUserSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -205,7 +205,7 @@ void mnodeDecUserRef(SUserObj *pUser) {
|
|||
}
|
||||
|
||||
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
|
@ -259,7 +259,7 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
|||
pUser->superAuth = 1;
|
||||
}
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
|
@ -279,7 +279,7 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
|
@ -562,7 +562,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
|
|||
if (pUser == NULL) break;
|
||||
|
||||
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
|
|
|
@ -72,12 +72,12 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
|
|||
tfree(pVgroup);
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDestroy(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pOper) {
|
||||
mnodeDestroyVgroup(pOper->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionInsert(SSWriteMsg *pOper) {
|
||||
SVgObj *pVgroup = pOper->pObj;
|
||||
|
||||
// refer to db
|
||||
|
@ -115,7 +115,7 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionDelete(SSWriteMsg *pOper) {
|
||||
SVgObj *pVgroup = pOper->pObj;
|
||||
|
||||
if (pVgroup->pDb == NULL) {
|
||||
|
@ -137,7 +137,7 @@ static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pOper) {
|
||||
SVgObj *pNew = pOper->pObj;
|
||||
SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId);
|
||||
|
||||
|
@ -176,7 +176,7 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionEncode(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionEncode(SSWriteMsg *pOper) {
|
||||
SVgObj *pVgroup = pOper->pObj;
|
||||
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
|
||||
SVgObj *pTmpVgroup = pOper->rowData;
|
||||
|
@ -189,7 +189,7 @@ static int32_t mnodeVgroupActionEncode(SSdbOper *pOper) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDecode(SSdbOper *pOper) {
|
||||
static int32_t mnodeVgroupActionDecode(SSWriteMsg *pOper) {
|
||||
SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
|
||||
if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -213,13 +213,13 @@ int32_t mnodeInitVgroups() {
|
|||
.maxRowSize = tsVgUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_AUTO,
|
||||
.insertFp = mnodeVgroupActionInsert,
|
||||
.deleteFp = mnodeVgroupActionDelete,
|
||||
.updateFp = mnodeVgroupActionUpdate,
|
||||
.encodeFp = mnodeVgroupActionEncode,
|
||||
.decodeFp = mnodeVgroupActionDecode,
|
||||
.destroyFp = mnodeVgroupActionDestroy,
|
||||
.restoredFp = mnodeVgroupActionRestored,
|
||||
.fpInsert = mnodeVgroupActionInsert,
|
||||
.fpDelete = mnodeVgroupActionDelete,
|
||||
.fpUpdate = mnodeVgroupActionUpdate,
|
||||
.fpEncode = mnodeVgroupActionEncode,
|
||||
.fpDecode = mnodeVgroupActionDecode,
|
||||
.fpDestroy = mnodeVgroupActionDestroy,
|
||||
.fpDestored = mnodeVgroupActionRestored,
|
||||
};
|
||||
|
||||
tsVgroupSdb = sdbOpenTable(&tableDesc);
|
||||
|
@ -253,7 +253,7 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
|
|||
}
|
||||
|
||||
void mnodeUpdateVgroup(SVgObj *pVgroup) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
|
@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
tstrerror(code));
|
||||
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
return code;
|
||||
} else {
|
||||
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
pDb->name, pVgroup->numOfVnodes);
|
||||
pVgroup->status = TAOS_VG_STATUS_READY;
|
||||
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
(void)sdbUpdateRow(&desc);
|
||||
|
||||
dnodeReprocessMWriteMsg(pMsg);
|
||||
|
@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
// pDb->name, pVgroup->numOfVnodes);
|
||||
// pVgroup->status = TAOS_VG_STATUS_READY;
|
||||
// SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
// SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
|
||||
// (void)sdbUpdateRow(&desc);
|
||||
// dnodeReprocessMWriteMsg(pMsg);
|
||||
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
|
@ -571,7 +571,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
|
|||
pMsg->pVgroup = pVgroup;
|
||||
mnodeIncVgroupRef(pVgroup);
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
|
@ -595,7 +595,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
|
|||
} else {
|
||||
mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
mnodeSendDropVgroupMsg(pVgroup, NULL);
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
|
@ -957,7 +957,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||
|
||||
if (mnodeMsg->received == mnodeMsg->successed) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
|
@ -973,7 +973,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
dnodeSendRpcMWriteRsp(mnodeMsg, code);
|
||||
}
|
||||
} else {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
|
@ -1031,7 +1031,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
|
@ -1084,7 +1084,7 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
|
|||
|
||||
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
|
||||
mnodeDropAllChildTablesInVgroups(pVgroup);
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
|
@ -1135,7 +1135,7 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
|
|||
if (pVgroup == NULL) break;
|
||||
|
||||
if (pVgroup->pDb == pDropDb) {
|
||||
SSdbOper oper = {
|
||||
SSWriteMsg oper = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
|
|
|
@ -118,7 +118,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char *module;
|
||||
bool (*decodeFp)(struct HttpContext *pContext);
|
||||
bool (*fpDecode)(struct HttpContext *pContext);
|
||||
} HttpDecodeMethod;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -21,11 +21,11 @@
|
|||
#include "httpHandle.h"
|
||||
|
||||
bool httpDecodeRequest(HttpContext* pContext) {
|
||||
if (pContext->decodeMethod->decodeFp == NULL) {
|
||||
if (pContext->decodeMethod->fpDecode == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (*pContext->decodeMethod->decodeFp)(pContext);
|
||||
return (*pContext->decodeMethod->fpDecode)(pContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue