TD-2046
This commit is contained in:
parent
566b749cc5
commit
f323186f78
|
@ -23,8 +23,6 @@ extern "C" {
|
|||
#include "mnode.h"
|
||||
#include "twal.h"
|
||||
|
||||
struct SSdbTable;
|
||||
|
||||
typedef enum {
|
||||
SDB_TABLE_CLUSTER = 0,
|
||||
SDB_TABLE_DNODE = 1,
|
||||
|
@ -50,20 +48,20 @@ typedef enum {
|
|||
SDB_OPER_LOCAL = 1
|
||||
} ESdbOper;
|
||||
|
||||
typedef struct SSWriteMsg {
|
||||
ESdbOper type;
|
||||
int32_t processedCount; // for sync fwd callback
|
||||
int32_t code; // for callback in sdb queue
|
||||
int32_t rowSize;
|
||||
void * rowData;
|
||||
typedef struct SSdbRow {
|
||||
ESdbOper type;
|
||||
int32_t processedCount; // for sync fwd callback
|
||||
int32_t code; // for callback in sdb queue
|
||||
int32_t rowSize;
|
||||
void * rowData;
|
||||
void * pObj;
|
||||
void * pTable;
|
||||
SMnodeMsg *pMsg;
|
||||
int32_t (*fpReq)(SMnodeMsg *pMsg);
|
||||
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
|
||||
void * pRow;
|
||||
SMnodeMsg *pMsg;
|
||||
struct SSdbTable *pTable;
|
||||
char reserveForSync[16];
|
||||
SWalHead pHead[];
|
||||
} SSWriteMsg;
|
||||
char reserveForSync[16];
|
||||
SWalHead pHead[];
|
||||
} SSdbRow;
|
||||
|
||||
typedef struct {
|
||||
char * name;
|
||||
|
@ -72,12 +70,12 @@ typedef struct {
|
|||
int32_t refCountPos;
|
||||
ESdbTable id;
|
||||
ESdbKey keyType;
|
||||
int32_t (*fpInsert)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDelete)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpUpdate)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpEncode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDecode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestroy)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpInsert)(SSdbRow *pRow);
|
||||
int32_t (*fpDelete)(SSdbRow *pRow);
|
||||
int32_t (*fpUpdate)(SSdbRow *pRow);
|
||||
int32_t (*fpEncode)(SSdbRow *pRow);
|
||||
int32_t (*fpDecode)(SSdbRow *pRow);
|
||||
int32_t (*fpDestroy)(SSdbRow *pRow);
|
||||
int32_t (*fpRestored)();
|
||||
} SSdbTableDesc;
|
||||
|
||||
|
@ -89,10 +87,10 @@ bool sdbIsMaster();
|
|||
bool sdbIsServing();
|
||||
void sdbUpdateMnodeRoles();
|
||||
|
||||
int32_t sdbInsertRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbDeleteRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbUpdateRow(SSWriteMsg *pWrite);
|
||||
int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite);
|
||||
int32_t sdbInsertRow(SSdbRow *pRow);
|
||||
int32_t sdbDeleteRow(SSdbRow *pRow);
|
||||
int32_t sdbUpdateRow(SSdbRow *pRow);
|
||||
int32_t sdbInsertRowToQueue(SSdbRow *pRow);
|
||||
|
||||
void *sdbGetRow(void *pTable, void *key);
|
||||
void *sdbFetchRow(void *pTable, void *pIter, void **ppRow);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
|
@ -25,36 +26,34 @@
|
|||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
|
||||
#include "tglobal.h"
|
||||
|
||||
void * tsAcctSdb = NULL;
|
||||
static int32_t tsAcctUpdateSize;
|
||||
static int32_t mnodeCreateRootAcct();
|
||||
|
||||
static int32_t mnodeAcctActionDestroy(SSWriteMsg *pWMsg) {
|
||||
SAcctObj *pAcct = pWMsg->pRow;
|
||||
static int32_t mnodeAcctActionDestroy(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = pRow->pObj;
|
||||
pthread_mutex_destroy(&pAcct->mutex);
|
||||
tfree(pWMsg->pRow);
|
||||
tfree(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionInsert(SSWriteMsg *pWMsg) {
|
||||
SAcctObj *pAcct = pWMsg->pRow;
|
||||
static int32_t mnodeAcctActionInsert(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = pRow->pObj;
|
||||
memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo));
|
||||
pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS;
|
||||
pthread_mutex_init(&pAcct->mutex, NULL);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionDelete(SSWriteMsg *pWMsg) {
|
||||
SAcctObj *pAcct = pWMsg->pRow;
|
||||
static int32_t mnodeAcctActionDelete(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = pRow->pObj;
|
||||
mnodeDropAllUsers(pAcct);
|
||||
mnodeDropAllDbs(pAcct);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SAcctObj *pAcct = pWMsg->pRow;
|
||||
static int32_t mnodeAcctActionUpdate(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = pRow->pObj;
|
||||
SAcctObj *pSaved = mnodeGetAcct(pAcct->user);
|
||||
if (pAcct != pSaved) {
|
||||
memcpy(pSaved, pAcct, tsAcctUpdateSize);
|
||||
|
@ -64,19 +63,19 @@ static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionEncode(SSWriteMsg *pWMsg) {
|
||||
SAcctObj *pAcct = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pAcct, tsAcctUpdateSize);
|
||||
pWMsg->rowSize = tsAcctUpdateSize;
|
||||
static int32_t mnodeAcctActionEncode(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = pRow->pObj;
|
||||
memcpy(pRow->rowData, pAcct, tsAcctUpdateSize);
|
||||
pRow->rowSize = tsAcctUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAcctActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeAcctActionDecode(SSdbRow *pRow) {
|
||||
SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj));
|
||||
if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pAcct, pWMsg->rowData, tsAcctUpdateSize);
|
||||
pWMsg->pRow = pAcct;
|
||||
memcpy(pAcct, pRow->rowData, tsAcctUpdateSize);
|
||||
pRow->pObj = pAcct;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -226,13 +225,13 @@ static int32_t mnodeCreateRootAcct() {
|
|||
pAcct->acctId = sdbGetId(tsAcctSdb);
|
||||
pAcct->createdTime = taosGetTimestampMs();
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsAcctSdb,
|
||||
.pRow = pAcct,
|
||||
.pObj = pAcct,
|
||||
};
|
||||
|
||||
return sdbInsertRow(&wmsg);
|
||||
return sdbInsertRow(&row);
|
||||
}
|
||||
|
||||
#ifndef _ACCT
|
||||
|
|
|
@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster();
|
|||
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
static int32_t mnodeClusterActionDestroy(SSWriteMsg *pWMsg) {
|
||||
tfree(pWMsg->pRow);
|
||||
static int32_t mnodeClusterActionDestroy(SSdbRow *pRow) {
|
||||
tfree(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionInsert(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeClusterActionInsert(SSdbRow *pRow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionDelete(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeClusterActionDelete(SSdbRow *pRow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionUpdate(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeClusterActionUpdate(SSdbRow *pRow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionEncode(SSWriteMsg *pWMsg) {
|
||||
SClusterObj *pCluster = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pCluster, tsClusterUpdateSize);
|
||||
pWMsg->rowSize = tsClusterUpdateSize;
|
||||
static int32_t mnodeClusterActionEncode(SSdbRow *pRow) {
|
||||
SClusterObj *pCluster = pRow->pObj;
|
||||
memcpy(pRow->rowData, pCluster, tsClusterUpdateSize);
|
||||
pRow->rowSize = tsClusterUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeClusterActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeClusterActionDecode(SSdbRow *pRow) {
|
||||
SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj));
|
||||
if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pCluster, pWMsg->rowData, tsClusterUpdateSize);
|
||||
pWMsg->pRow = pCluster;
|
||||
memcpy(pCluster, pRow->rowData, tsClusterUpdateSize);
|
||||
pRow->pObj = pCluster;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() {
|
|||
mDebug("uid is %s", pCluster->uid);
|
||||
}
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsClusterSdb,
|
||||
.pRow = pCluster,
|
||||
.pObj = pCluster,
|
||||
};
|
||||
|
||||
return sdbInsertRow(&wmsg);
|
||||
return sdbInsertRow(&row);
|
||||
}
|
||||
|
||||
const char* mnodeGetClusterId() {
|
||||
|
|
|
@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) {
|
|||
tfree(pDb);
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDestroy(SSWriteMsg *pWMsg) {
|
||||
mnodeDestroyDb(pWMsg->pRow);
|
||||
static int32_t mnodeDbActionDestroy(SSdbRow *pRow) {
|
||||
mnodeDestroyDb(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() {
|
|||
return sdbGetNumOfRows(tsDbSdb);
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) {
|
||||
SDbObj *pDb = pWMsg->pRow;
|
||||
static int32_t mnodeDbActionInsert(SSdbRow *pRow) {
|
||||
SDbObj *pDb = pRow->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
|
||||
|
||||
pthread_mutex_init(&pDb->mutex, NULL);
|
||||
|
@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) {
|
||||
SDbObj *pDb = pWMsg->pRow;
|
||||
static int32_t mnodeDbActionDelete(SSdbRow *pRow) {
|
||||
SDbObj *pDb = pRow->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
|
||||
|
||||
mnodeDropAllChildTables(pDb);
|
||||
|
@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SDbObj *pNew = pWMsg->pRow;
|
||||
static int32_t mnodeDbActionUpdate(SSdbRow *pRow) {
|
||||
SDbObj *pNew = pRow->pObj;
|
||||
SDbObj *pDb = mnodeGetDb(pNew->name);
|
||||
if (pDb != NULL && pNew != pDb) {
|
||||
memcpy(pDb, pNew, pWMsg->rowSize);
|
||||
memcpy(pDb, pNew, pRow->rowSize);
|
||||
free(pNew->vgList);
|
||||
free(pNew);
|
||||
}
|
||||
|
@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionEncode(SSWriteMsg *pWMsg) {
|
||||
SDbObj *pDb = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pDb, tsDbUpdateSize);
|
||||
pWMsg->rowSize = tsDbUpdateSize;
|
||||
static int32_t mnodeDbActionEncode(SSdbRow *pRow) {
|
||||
SDbObj *pDb = pRow->pObj;
|
||||
memcpy(pRow->rowData, pDb, tsDbUpdateSize);
|
||||
pRow->rowSize = tsDbUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDbActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeDbActionDecode(SSdbRow *pRow) {
|
||||
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
|
||||
if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pDb, pWMsg->rowData, tsDbUpdateSize);
|
||||
pWMsg->pRow = pDb;
|
||||
memcpy(pDb, pRow->rowData, tsDbUpdateSize);
|
||||
pRow->pObj = pDb;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
|
|||
pMsg->pDb = pDb;
|
||||
mnodeIncDbRef(pDb);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDbSdb,
|
||||
.pRow = pDb,
|
||||
.pObj = pDb,
|
||||
.rowSize = sizeof(SDbObj),
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeCreateDbCb
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&wmsg);
|
||||
code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
|
||||
pMsg->pDb = NULL;
|
||||
|
@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
|
|||
}
|
||||
|
||||
#if 0
|
||||
void mnodePrintVgroups(SDbObj *pDb, char *wmsg) {
|
||||
mInfo("db:%s, vgroup link from head, wmsg:%s", pDb->name, wmsg);
|
||||
void mnodePrintVgroups(SDbObj *pDb, char *row) {
|
||||
mInfo("db:%s, vgroup link from head, row:%s", pDb->name, row);
|
||||
SVgObj *pVgroup = pDb->pHead;
|
||||
while (pVgroup != NULL) {
|
||||
mInfo("vgId:%d", pVgroup->vgId);
|
||||
|
@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
|
|||
if (pDb->status) return TSDB_CODE_SUCCESS;
|
||||
|
||||
pDb->status = true;
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDbSdb,
|
||||
.pRow = pDb
|
||||
.pObj = pDb
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&wmsg);
|
||||
int32_t code = sdbUpdateRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code));
|
||||
}
|
||||
|
@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
|
|||
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
||||
pDb->cfg = newCfg;
|
||||
pDb->cfgVersion++;
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDbSdb,
|
||||
.pRow = pDb,
|
||||
.pObj = pDb,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAlterDbCb
|
||||
};
|
||||
|
||||
code = sdbUpdateRow(&wmsg);
|
||||
code = sdbUpdateRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code));
|
||||
}
|
||||
|
@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
|||
SDbObj *pDb = pMsg->pDb;
|
||||
mInfo("db:%s, drop db from sdb", pDb->name);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDbSdb,
|
||||
.pRow = pDb,
|
||||
.pObj = pDb,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeDropDbCb
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code));
|
||||
}
|
||||
|
@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
|
|||
|
||||
if (pDb->pAcct == pAcct) {
|
||||
mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user);
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsDbSdb,
|
||||
.pRow = pDb
|
||||
.pObj = pDb
|
||||
};
|
||||
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfDbs++;
|
||||
}
|
||||
mnodeDecDbRef(pDb);
|
||||
|
|
|
@ -87,13 +87,13 @@ static char* offlineReason[] = {
|
|||
"unknown",
|
||||
};
|
||||
|
||||
static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pWMsg) {
|
||||
tfree(pWMsg->pRow);
|
||||
static int32_t mnodeDnodeActionDestroy(SSdbRow *pRow) {
|
||||
tfree(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) {
|
||||
SDnodeObj *pDnode = pWMsg->pRow;
|
||||
static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) {
|
||||
SDnodeObj *pDnode = pRow->pObj;
|
||||
if (pDnode->status != TAOS_DN_STATUS_DROPPING) {
|
||||
pDnode->status = TAOS_DN_STATUS_OFFLINE;
|
||||
pDnode->lastAccess = tsAccessSquence;
|
||||
|
@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) {
|
||||
SDnodeObj *pDnode = pWMsg->pRow;
|
||||
static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) {
|
||||
SDnodeObj *pDnode = pRow->pObj;
|
||||
|
||||
#ifndef _SYNC
|
||||
mnodeDropAllDnodeVgroups(pDnode);
|
||||
|
@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SDnodeObj *pNew = pWMsg->pRow;
|
||||
static int32_t mnodeDnodeActionUpdate(SSdbRow *pRow) {
|
||||
SDnodeObj *pNew = pRow->pObj;
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
|
||||
if (pDnode != NULL && pNew != pDnode) {
|
||||
memcpy(pDnode, pNew, pWMsg->rowSize);
|
||||
memcpy(pDnode, pNew, pRow->rowSize);
|
||||
free(pNew);
|
||||
}
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
|
@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionEncode(SSWriteMsg *pWMsg) {
|
||||
SDnodeObj *pDnode = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pDnode, tsDnodeUpdateSize);
|
||||
pWMsg->rowSize = tsDnodeUpdateSize;
|
||||
static int32_t mnodeDnodeActionEncode(SSdbRow *pRow) {
|
||||
SDnodeObj *pDnode = pRow->pObj;
|
||||
memcpy(pRow->rowData, pDnode, tsDnodeUpdateSize);
|
||||
pRow->rowSize = tsDnodeUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDnodeActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeDnodeActionDecode(SSdbRow *pRow) {
|
||||
SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj));
|
||||
if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pDnode, pWMsg->rowData, tsDnodeUpdateSize);
|
||||
pWMsg->pRow = pDnode;
|
||||
memcpy(pDnode, pRow->rowData, tsDnodeUpdateSize);
|
||||
pRow->pObj = pDnode;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) {
|
|||
}
|
||||
|
||||
void mnodeUpdateDnode(SDnodeObj *pDnode) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDnodeSdb,
|
||||
.pRow = pDnode
|
||||
.pObj = pDnode
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&wmsg);
|
||||
int32_t code = sdbUpdateRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("dnodeId:%d, failed update", pDnode->dnodeId);
|
||||
}
|
||||
|
@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
|
|||
tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN);
|
||||
taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDnodeSdb,
|
||||
.pRow = pDnode,
|
||||
.pObj = pDnode,
|
||||
.rowSize = sizeof(SDnodeObj),
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&wmsg);
|
||||
int32_t code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
int dnodeId = pDnode->dnodeId;
|
||||
tfree(pDnode);
|
||||
|
@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsDnodeSdb,
|
||||
.pRow = pDnode,
|
||||
.pObj = pDnode,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
|
||||
} else {
|
||||
|
|
|
@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
|||
#define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
|
||||
#endif
|
||||
|
||||
static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pWMsg) {
|
||||
tfree(pWMsg->pRow);
|
||||
static int32_t mnodeMnodeActionDestroy(SSdbRow *pRow) {
|
||||
tfree(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) {
|
||||
SMnodeObj *pMnode = pWMsg->pRow;
|
||||
static int32_t mnodeMnodeActionInsert(SSdbRow *pRow) {
|
||||
SMnodeObj *pMnode = pRow->pObj;
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
|
||||
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
|
||||
|
@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) {
|
||||
SMnodeObj *pMnode = pWMsg->pRow;
|
||||
static int32_t mnodeMnodeActionDelete(SSdbRow *pRow) {
|
||||
SMnodeObj *pMnode = pRow->pObj;
|
||||
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
|
||||
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
|
@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SMnodeObj *pMnode = pWMsg->pRow;
|
||||
static int32_t mnodeMnodeActionUpdate(SSdbRow *pRow) {
|
||||
SMnodeObj *pMnode = pRow->pObj;
|
||||
SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId);
|
||||
if (pMnode != pSaved) {
|
||||
memcpy(pSaved, pMnode, pWMsg->rowSize);
|
||||
memcpy(pSaved, pMnode, pRow->rowSize);
|
||||
free(pMnode);
|
||||
}
|
||||
mnodeDecMnodeRef(pSaved);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionEncode(SSWriteMsg *pWMsg) {
|
||||
SMnodeObj *pMnode = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pMnode, tsMnodeUpdateSize);
|
||||
pWMsg->rowSize = tsMnodeUpdateSize;
|
||||
static int32_t mnodeMnodeActionEncode(SSdbRow *pRow) {
|
||||
SMnodeObj *pMnode = pRow->pObj;
|
||||
memcpy(pRow->rowData, pMnode, tsMnodeUpdateSize);
|
||||
pRow->rowSize = tsMnodeUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeMnodeActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeMnodeActionDecode(SSdbRow *pRow) {
|
||||
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
|
||||
if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pMnode, pWMsg->rowData, tsMnodeUpdateSize);
|
||||
pWMsg->pRow = pMnode;
|
||||
memcpy(pMnode, pRow->rowData, tsMnodeUpdateSize);
|
||||
pRow->pObj = pMnode;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -325,10 +325,10 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
pMnode->mnodeId = dnodeId;
|
||||
pMnode->createdTime = taosGetTimestampMs();
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsMnodeSdb,
|
||||
.pRow = pMnode,
|
||||
.pObj = pMnode,
|
||||
.fpRsp = mnodeCreateMnodeCb
|
||||
};
|
||||
|
||||
|
@ -342,7 +342,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
return;
|
||||
}
|
||||
|
||||
code = sdbInsertRow(&wmsg);
|
||||
code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
|
||||
tfree(pMnode);
|
||||
|
@ -352,8 +352,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
void mnodeDropMnodeLocal(int32_t dnodeId) {
|
||||
SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
|
||||
if (pMnode != NULL) {
|
||||
SSWriteMsg wmsg = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pRow = pMnode};
|
||||
sdbDeleteRow(&wmsg);
|
||||
SSdbRow row = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pObj = pMnode};
|
||||
sdbDeleteRow(&row);
|
||||
mnodeDecMnodeRef(pMnode);
|
||||
}
|
||||
|
||||
|
@ -367,13 +367,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
|
|||
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsMnodeSdb,
|
||||
.pRow = pMnode
|
||||
.pObj = pMnode
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
|
||||
sdbDecRef(tsMnodeSdb, pMnode);
|
||||
|
||||
|
|
|
@ -65,12 +65,12 @@ typedef struct SSdbTable {
|
|||
int32_t autoIndex;
|
||||
int64_t numOfRows;
|
||||
void * iHandle;
|
||||
int32_t (*fpInsert)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDelete)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpUpdate)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDecode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpEncode)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpDestroy)(SSWriteMsg *pWrite);
|
||||
int32_t (*fpInsert)(SSdbRow *pRow);
|
||||
int32_t (*fpDelete)(SSdbRow *pRow);
|
||||
int32_t (*fpUpdate)(SSdbRow *pRow);
|
||||
int32_t (*fpDecode)(SSdbRow *pRow);
|
||||
int32_t (*fpEncode)(SSdbRow *pRow);
|
||||
int32_t (*fpDestroy)(SSdbRow *pRow);
|
||||
int32_t (*fpRestored)();
|
||||
pthread_mutex_t mutex;
|
||||
} SSdbTable;
|
||||
|
@ -106,17 +106,17 @@ static taos_qall tsSdbWQall;
|
|||
static taos_queue tsSdbWQueue;
|
||||
static SSdbWorkerPool tsSdbPool;
|
||||
|
||||
static int32_t sdbProcessWrite(void *pWrite, void *pHead, int32_t qtype, void *unused);
|
||||
static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused);
|
||||
static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam);
|
||||
static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action);
|
||||
static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action);
|
||||
static void * sdbWorkerFp(void *pWorker);
|
||||
static int32_t sdbInitWorker();
|
||||
static void sdbCleanupWorker();
|
||||
static int32_t sdbAllocQueue();
|
||||
static void sdbFreeQueue();
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow);
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow);
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow);
|
||||
|
||||
int32_t sdbGetId(void *pTable) {
|
||||
return ((SSdbTable *)pTable)->autoIndex;
|
||||
|
@ -248,28 +248,28 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
|
|||
}
|
||||
|
||||
// failed to forward, need revert insert
|
||||
static void sdbHandleFailedConfirm(SSWriteMsg *pWrite) {
|
||||
SWalHead *pHead = pWrite->pHead;
|
||||
static void sdbHandleFailedConfirm(SSdbRow *pRow) {
|
||||
SWalHead *pHead = pRow->pHead;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
||||
sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow,
|
||||
sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pWrite->code));
|
||||
sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pRow->pObj,
|
||||
sdbGetKeyStr(pRow->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pRow->code));
|
||||
|
||||
// It's better to create a table in two stages, create it first and then set it success
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = pWrite->pTable, .pRow = pWrite->pRow};
|
||||
sdbDeleteRow(&wmsg);
|
||||
SSdbRow row = {.type = SDB_OPER_GLOBAL, .pTable = pRow->pTable, .pObj = pRow->pObj};
|
||||
sdbDeleteRow(&row);
|
||||
}
|
||||
}
|
||||
|
||||
FORCE_INLINE
|
||||
static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
|
||||
if (wparam == NULL) return;
|
||||
SSWriteMsg *pWrite = wparam;
|
||||
SMnodeMsg * pMsg = pWrite->pMsg;
|
||||
SSdbRow *pRow = wparam;
|
||||
SMnodeMsg * pMsg = pRow->pMsg;
|
||||
|
||||
if (code <= 0) pWrite->code = code;
|
||||
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
|
||||
if (code <= 0) pRow->code = code;
|
||||
int32_t count = atomic_add_fetch_32(&pRow->processedCount, 1);
|
||||
if (count <= 1) {
|
||||
if (pMsg != NULL) sdbTrace("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, count, code);
|
||||
return;
|
||||
|
@ -277,13 +277,13 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
|
|||
if (pMsg != NULL) sdbTrace("vgId:1, msg:%p is confirmed, code:%x", pMsg, code);
|
||||
}
|
||||
|
||||
if (pWrite->code != TSDB_CODE_SUCCESS) sdbHandleFailedConfirm(pWrite);
|
||||
if (pRow->code != TSDB_CODE_SUCCESS) sdbHandleFailedConfirm(pRow);
|
||||
|
||||
if (pWrite->fpRsp != NULL) {
|
||||
pWrite->code = (*pWrite->fpRsp)(pMsg, pWrite->code);
|
||||
if (pRow->fpRsp != NULL) {
|
||||
pRow->code = (*pRow->fpRsp)(pMsg, pRow->code);
|
||||
}
|
||||
|
||||
dnodeSendRpcMWriteRsp(pMsg, pWrite->code);
|
||||
dnodeSendRpcMWriteRsp(pMsg, pRow->code);
|
||||
}
|
||||
|
||||
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
|
||||
|
@ -447,8 +447,8 @@ void sdbDecRef(void *tparam, void *pRow) {
|
|||
int32_t *updateEnd = pRow + pTable->refCountPos - 4;
|
||||
if (refCount <= 0 && *updateEnd) {
|
||||
sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount);
|
||||
SSWriteMsg wmsg = {.pRow = pRow};
|
||||
(*pTable->fpDestroy)(&wmsg);
|
||||
SSdbRow row = {.pObj = pRow};
|
||||
(*pTable->fpDestroy)(&row);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,8 +485,8 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
|
|||
return sdbGetRow(pTable, sdbGetObjKey(pTable, key));
|
||||
}
|
||||
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
void * key = sdbGetObjKey(pTable, pWrite->pRow);
|
||||
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
|
||||
void * key = sdbGetObjKey(pTable, pRow->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
|
@ -494,43 +494,43 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
|||
}
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
taosHashPut(pTable->iHandle, key, keySize, &pWrite->pRow, sizeof(int64_t));
|
||||
taosHashPut(pTable->iHandle, key, keySize, &pRow->pObj, sizeof(int64_t));
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbIncRef(pTable, pWrite->pRow);
|
||||
sdbIncRef(pTable, pRow->pObj);
|
||||
atomic_add_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pWrite->pRow));
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pRow->pObj));
|
||||
} else {
|
||||
atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
|
||||
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name,
|
||||
sdbGetRowStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg);
|
||||
sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg);
|
||||
|
||||
int32_t code = (*pTable->fpInsert)(pWrite);
|
||||
int32_t code = (*pTable->fpInsert)(pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->name,
|
||||
sdbGetRowStr(pTable, pWrite->pRow));
|
||||
sdbDeleteHash(pTable, pWrite);
|
||||
sdbGetRowStr(pTable, pRow->pObj));
|
||||
sdbDeleteHash(pTable, pRow);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
int32_t *updateEnd = pWrite->pRow + pTable->refCountPos - 4;
|
||||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
|
||||
int32_t *updateEnd = pRow->pObj + pTable->refCountPos - 4;
|
||||
bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0;
|
||||
if (!set) {
|
||||
sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->name,
|
||||
sdbGetRowStr(pTable, pWrite->pRow));
|
||||
sdbGetRowStr(pTable, pRow->pObj));
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
(*pTable->fpDelete)(pWrite);
|
||||
(*pTable->fpDelete)(pRow);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pWrite->pRow);
|
||||
void * key = sdbGetObjKey(pTable, pRow->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
|
@ -543,23 +543,23 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
|||
atomic_sub_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
|
||||
sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg);
|
||||
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
|
||||
|
||||
sdbDecRef(pTable, pWrite->pRow);
|
||||
sdbDecRef(pTable, pRow->pObj);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
||||
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) {
|
||||
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
|
||||
sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg);
|
||||
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
|
||||
|
||||
(*pTable->fpUpdate)(pWrite);
|
||||
(*pTable->fpUpdate)(pRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
|
||||
SSWriteMsg *pWrite = wparam;
|
||||
SSdbRow *pRow = wparam;
|
||||
SWalHead *pHead = hparam;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
@ -598,22 +598,22 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus
|
|||
|
||||
pthread_mutex_unlock(&tsSdbMgmt.mutex);
|
||||
|
||||
// from app, wmsg is created
|
||||
if (pWrite != NULL) {
|
||||
// from app, row is created
|
||||
if (pRow != NULL) {
|
||||
// forward to peers
|
||||
pWrite->processedCount = 0;
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC);
|
||||
if (syncCode <= 0) pWrite->processedCount = 1;
|
||||
pRow->processedCount = 0;
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
|
||||
if (syncCode <= 0) pRow->processedCount = 1;
|
||||
|
||||
if (syncCode < 0) {
|
||||
sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name,
|
||||
tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg);
|
||||
} else if (syncCode > 0) {
|
||||
sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name,
|
||||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg);
|
||||
} else {
|
||||
sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->name,
|
||||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
|
||||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pRow->pMsg);
|
||||
}
|
||||
return syncCode;
|
||||
}
|
||||
|
@ -622,71 +622,71 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus
|
|||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||
|
||||
// even it is WAL/FWD, it shall be called to update version in sync
|
||||
syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC);
|
||||
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
|
||||
|
||||
// from wal or forward msg, wmsg not created, should add into hash
|
||||
// from wal or forward msg, row not created, should add into hash
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
|
||||
code = (*pTable->fpDecode)(&wmsg);
|
||||
return sdbInsertHash(pTable, &wmsg);
|
||||
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
|
||||
code = (*pTable->fpDecode)(&row);
|
||||
return sdbInsertHash(pTable, &row);
|
||||
} else if (action == SDB_ACTION_DELETE) {
|
||||
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
|
||||
if (pRow == NULL) {
|
||||
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
|
||||
if (pObj == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name,
|
||||
sdbGetKeyStr(pTable, pHead->cont));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SSWriteMsg wmsg = {.pTable = pTable, .pRow = pRow};
|
||||
return sdbDeleteHash(pTable, &wmsg);
|
||||
SSdbRow row = {.pTable = pTable, .pObj = pObj};
|
||||
return sdbDeleteHash(pTable, &row);
|
||||
} else if (action == SDB_ACTION_UPDATE) {
|
||||
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
|
||||
if (pRow == NULL) {
|
||||
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
|
||||
if (pObj == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name,
|
||||
sdbGetKeyStr(pTable, pHead->cont));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
|
||||
code = (*pTable->fpDecode)(&wmsg);
|
||||
return sdbUpdateHash(pTable, &wmsg);
|
||||
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
|
||||
code = (*pTable->fpDecode)(&row);
|
||||
return sdbUpdateHash(pTable, &row);
|
||||
} else {
|
||||
return TSDB_CODE_MND_INVALID_MSG_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbInsertRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = pWrite->pTable;
|
||||
int32_t sdbInsertRow(SSdbRow *pRow) {
|
||||
SSdbTable *pTable = pRow->pTable;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
if (sdbGetRowFromObj(pTable, pWrite->pRow)) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pWrite->pRow));
|
||||
sdbDecRef(pTable, pWrite->pRow);
|
||||
if (sdbGetRowFromObj(pTable, pRow->pObj)) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pRow->pObj));
|
||||
sdbDecRef(pTable, pRow->pObj);
|
||||
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
|
||||
}
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
*((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
*((uint32_t *)pRow->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
|
||||
// let vgId increase from 2
|
||||
if (pTable->autoIndex == 1 && pTable->id == SDB_TABLE_VGROUP) {
|
||||
*((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
*((uint32_t *)pRow->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = sdbInsertHash(pTable, pWrite);
|
||||
int32_t code = sdbInsertHash(pTable, pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pWrite->pRow));
|
||||
sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pRow->pObj));
|
||||
return code;
|
||||
}
|
||||
|
||||
// just insert data into memory
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
if (pRow->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pWrite->fpReq) {
|
||||
return (*pWrite->fpReq)(pWrite->pMsg);
|
||||
if (pRow->fpReq) {
|
||||
return (*pRow->fpReq)(pRow->pMsg);
|
||||
} else {
|
||||
return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT);
|
||||
return sdbWriteRowToQueue(pRow, SDB_ACTION_INSERT);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,59 +698,59 @@ bool sdbCheckRowDeleted(void *tparam, void *pRow) {
|
|||
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
|
||||
}
|
||||
|
||||
int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = pWrite->pTable;
|
||||
int32_t sdbDeleteRow(SSdbRow *pRow) {
|
||||
SSdbTable *pTable = pRow->pTable;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pRow);
|
||||
if (pRow == NULL) {
|
||||
void *pObj = sdbGetRowMetaFromObj(pTable, pRow->pObj);
|
||||
if (pObj == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, record is not there, delete failed", pTable->name);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
int32_t code = sdbDeleteHash(pTable, pWrite);
|
||||
int32_t code = sdbDeleteHash(pTable, pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->name);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just delete data from memory
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
if (pRow->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pWrite->fpReq) {
|
||||
return (*pWrite->fpReq)(pWrite->pMsg);
|
||||
if (pRow->fpReq) {
|
||||
return (*pRow->fpReq)(pRow->pMsg);
|
||||
} else {
|
||||
return sdbWriteRowToQueue(pWrite, SDB_ACTION_DELETE);
|
||||
return sdbWriteRowToQueue(pRow, SDB_ACTION_DELETE);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
|
||||
SSdbTable *pTable = pWrite->pTable;
|
||||
int32_t sdbUpdateRow(SSdbRow *pRow) {
|
||||
SSdbTable *pTable = pRow->pTable;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pRow);
|
||||
if (pRow == NULL) {
|
||||
void *pObj = sdbGetRowMetaFromObj(pTable, pRow->pObj);
|
||||
if (pObj == NULL) {
|
||||
sdbDebug("vgId:1, sdb:%s, record is not there, update failed", pTable->name);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
int32_t code = sdbUpdateHash(pTable, pWrite);
|
||||
int32_t code = sdbUpdateHash(pTable, pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("vgId:1, sdb:%s, failed to update hash", pTable->name);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just update data in memory
|
||||
if (pWrite->type != SDB_OPER_GLOBAL) {
|
||||
if (pRow->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pWrite->fpReq) {
|
||||
return (*pWrite->fpReq)(pWrite->pMsg);
|
||||
if (pRow->fpReq) {
|
||||
return (*pRow->fpReq)(pRow->pMsg);
|
||||
} else {
|
||||
return sdbWriteRowToQueue(pWrite, SDB_ACTION_UPDATE);
|
||||
return sdbWriteRowToQueue(pRow, SDB_ACTION_UPDATE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -830,12 +830,12 @@ void sdbCloseTable(void *handle) {
|
|||
void **ppRow = taosHashIterGet(pIter);
|
||||
if (ppRow == NULL) continue;
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
.pRow = *ppRow,
|
||||
SSdbRow row = {
|
||||
.pObj = *ppRow,
|
||||
.pTable = pTable,
|
||||
};
|
||||
|
||||
(*pTable->fpDestroy)(&wmsg);
|
||||
(*pTable->fpDestroy)(&row);
|
||||
}
|
||||
|
||||
taosHashDestroyIter(pIter);
|
||||
|
@ -934,12 +934,12 @@ static void sdbFreeQueue() {
|
|||
tsSdbWQueue = NULL;
|
||||
}
|
||||
|
||||
static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) {
|
||||
SWalHead *pHead = pWrite->pHead;
|
||||
static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) {
|
||||
SWalHead *pHead = pRow->pHead;
|
||||
|
||||
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
||||
sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version);
|
||||
taosFreeQitem(pWrite);
|
||||
taosFreeQitem(pRow);
|
||||
return TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
}
|
||||
|
||||
|
@ -949,64 +949,64 @@ static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) {
|
|||
taosMsleep(1);
|
||||
}
|
||||
|
||||
sdbIncRef(pWrite->pTable, pWrite->pRow);
|
||||
sdbIncRef(pRow->pTable, pRow->pObj);
|
||||
|
||||
sdbTrace("vgId:1, msg:%p write into to sdb queue", pWrite->pMsg);
|
||||
taosWriteQitem(tsSdbWQueue, qtype, pWrite);
|
||||
sdbTrace("vgId:1, msg:%p write into to sdb queue", pRow->pMsg);
|
||||
taosWriteQitem(tsSdbWQueue, qtype, pRow);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static void sdbFreeFromQueue(SSWriteMsg *pWrite) {
|
||||
static void sdbFreeFromQueue(SSdbRow *pRow) {
|
||||
int32_t queued = atomic_sub_fetch_32(&tsSdbMgmt.queuedMsg, 1);
|
||||
sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pWrite->pMsg, queued);
|
||||
sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pRow->pMsg, queued);
|
||||
|
||||
sdbDecRef(pWrite->pTable, pWrite->pRow);
|
||||
taosFreeQitem(pWrite);
|
||||
sdbDecRef(pRow->pTable, pRow->pObj);
|
||||
taosFreeQitem(pRow);
|
||||
}
|
||||
|
||||
static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
||||
SWalHead *pHead = wparam;
|
||||
|
||||
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pHead->len;
|
||||
SSWriteMsg *pWrite = taosAllocateQitem(size);
|
||||
if (pWrite == NULL) {
|
||||
int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len;
|
||||
SSdbRow *pRow = taosAllocateQitem(size);
|
||||
if (pRow == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return sdbWriteToQueue(pWrite, qtype);
|
||||
return sdbWriteToQueue(pRow, qtype);
|
||||
}
|
||||
|
||||
static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action) {
|
||||
SSdbTable *pTable = pInputWrite->pTable;
|
||||
static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) {
|
||||
SSdbTable *pTable = pInputRow->pTable;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SSWriteMsg *pWrite = taosAllocateQitem(size);
|
||||
if (pWrite == NULL) {
|
||||
int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SSdbRow *pRow = taosAllocateQitem(size);
|
||||
if (pRow == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pWrite, pInputWrite, sizeof(SSWriteMsg));
|
||||
pWrite->processedCount = 1;
|
||||
memcpy(pRow, pInputRow, sizeof(SSdbRow));
|
||||
pRow->processedCount = 1;
|
||||
|
||||
SWalHead *pHead = pWrite->pHead;
|
||||
pWrite->rowData = pHead->cont;
|
||||
(*pTable->fpEncode)(pWrite);
|
||||
SWalHead *pHead = pRow->pHead;
|
||||
pRow->rowData = pHead->cont;
|
||||
(*pTable->fpEncode)(pRow);
|
||||
|
||||
pHead->len = pWrite->rowSize;
|
||||
pHead->len = pRow->rowSize;
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->id * 10 + action;
|
||||
|
||||
return sdbWriteToQueue(pWrite, TAOS_QTYPE_RPC);
|
||||
return sdbWriteToQueue(pRow, TAOS_QTYPE_RPC);
|
||||
}
|
||||
|
||||
int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite) { return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); }
|
||||
int32_t sdbInsertRowToQueue(SSdbRow *pRow) { return sdbWriteRowToQueue(pRow, SDB_ACTION_INSERT); }
|
||||
|
||||
static void *sdbWorkerFp(void *pWorker) {
|
||||
SSWriteMsg *pWrite;
|
||||
int32_t qtype;
|
||||
void * unUsed;
|
||||
SSdbRow *pRow;
|
||||
int32_t qtype;
|
||||
void * unUsed;
|
||||
|
||||
while (1) {
|
||||
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
|
||||
|
@ -1016,14 +1016,14 @@ static void *sdbWorkerFp(void *pWorker) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite);
|
||||
sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pWrite->pMsg, pWrite->pRow,
|
||||
pWrite->pHead->version);
|
||||
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
|
||||
sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pRow->pMsg, pRow->pObj,
|
||||
pRow->pHead->version);
|
||||
|
||||
pWrite->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pWrite : NULL, pWrite->pHead, qtype, NULL);
|
||||
if (pWrite->code > 0) pWrite->code = 0;
|
||||
pRow->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pRow : NULL, pRow->pHead, qtype, NULL);
|
||||
if (pRow->code > 0) pRow->code = 0;
|
||||
|
||||
sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pWrite->pMsg, pWrite->code);
|
||||
sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pRow->pMsg, pRow->code);
|
||||
}
|
||||
|
||||
walFsync(tsSdbMgmt.wal, true);
|
||||
|
@ -1031,16 +1031,16 @@ static void *sdbWorkerFp(void *pWorker) {
|
|||
// browse all items, and process them one by one
|
||||
taosResetQitems(tsSdbWQall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite);
|
||||
taosGetQitem(tsSdbWQall, &qtype, (void **)&pRow);
|
||||
|
||||
if (qtype == TAOS_QTYPE_RPC) {
|
||||
sdbConfirmForward(NULL, pWrite, pWrite->code);
|
||||
sdbConfirmForward(NULL, pRow, pRow->code);
|
||||
} else if (qtype == TAOS_QTYPE_FWD) {
|
||||
syncConfirmForward(tsSdbMgmt.sync, pWrite->pHead->version, pWrite->code);
|
||||
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
|
||||
} else {
|
||||
}
|
||||
|
||||
sdbFreeFromQueue(pWrite);
|
||||
sdbFreeFromQueue(pRow);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,13 +99,13 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
|
|||
tfree(pTable);
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pWMsg) {
|
||||
mnodeDestroyChildTable(pWMsg->pRow);
|
||||
static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) {
|
||||
mnodeDestroyChildTable(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) {
|
||||
SCTableObj *pTable = pWMsg->pRow;
|
||||
static int32_t mnodeChildTableActionInsert(SSdbRow *pRow) {
|
||||
SCTableObj *pTable = pRow->pObj;
|
||||
|
||||
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
|
@ -153,8 +153,8 @@ static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) {
|
||||
SCTableObj *pTable = pWMsg->pRow;
|
||||
static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) {
|
||||
SCTableObj *pTable = pRow->pObj;
|
||||
if (pTable->vgId == 0) {
|
||||
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||
}
|
||||
|
@ -189,8 +189,8 @@ static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SCTableObj *pNew = pWMsg->pRow;
|
||||
static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) {
|
||||
SCTableObj *pNew = pRow->pObj;
|
||||
SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId);
|
||||
if (pTable != pNew) {
|
||||
void *oldTableId = pTable->info.tableId;
|
||||
|
@ -216,50 +216,50 @@ static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionEncode(SSWriteMsg *pWMsg) {
|
||||
SCTableObj *pTable = pWMsg->pRow;
|
||||
assert(pTable != NULL && pWMsg->rowData != NULL);
|
||||
static int32_t mnodeChildTableActionEncode(SSdbRow *pRow) {
|
||||
SCTableObj *pTable = pRow->pObj;
|
||||
assert(pTable != NULL && pRow->rowData != NULL);
|
||||
|
||||
int32_t len = strlen(pTable->info.tableId);
|
||||
if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
|
||||
memcpy(pWMsg->rowData, pTable->info.tableId, len);
|
||||
memset(pWMsg->rowData + len, 0, 1);
|
||||
memcpy(pRow->rowData, pTable->info.tableId, len);
|
||||
memset(pRow->rowData + len, 0, 1);
|
||||
len++;
|
||||
|
||||
memcpy(pWMsg->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize);
|
||||
memcpy(pRow->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize);
|
||||
len += tsChildTableUpdateSize;
|
||||
|
||||
if (pTable->info.type != TSDB_CHILD_TABLE) {
|
||||
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
|
||||
memcpy(pWMsg->rowData + len, pTable->schema, schemaSize);
|
||||
memcpy(pRow->rowData + len, pTable->schema, schemaSize);
|
||||
len += schemaSize;
|
||||
|
||||
if (pTable->sqlLen != 0) {
|
||||
memcpy(pWMsg->rowData + len, pTable->sql, pTable->sqlLen);
|
||||
memcpy(pRow->rowData + len, pTable->sql, pTable->sqlLen);
|
||||
len += pTable->sqlLen;
|
||||
}
|
||||
}
|
||||
|
||||
pWMsg->rowSize = len;
|
||||
pRow->rowSize = len;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) {
|
||||
assert(pWMsg->rowData != NULL);
|
||||
static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
|
||||
assert(pRow->rowData != NULL);
|
||||
SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
|
||||
if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
int32_t len = strlen(pWMsg->rowData);
|
||||
int32_t len = strlen(pRow->rowData);
|
||||
if (len >= TSDB_TABLE_FNAME_LEN) {
|
||||
free(pTable);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
}
|
||||
pTable->info.tableId = strdup(pWMsg->rowData);
|
||||
pTable->info.tableId = strdup(pRow->rowData);
|
||||
len++;
|
||||
|
||||
memcpy((char*)pTable + sizeof(char *), pWMsg->rowData + len, tsChildTableUpdateSize);
|
||||
memcpy((char*)pTable + sizeof(char *), pRow->rowData + len, tsChildTableUpdateSize);
|
||||
len += tsChildTableUpdateSize;
|
||||
|
||||
if (pTable->info.type != TSDB_CHILD_TABLE) {
|
||||
|
@ -269,7 +269,7 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) {
|
|||
mnodeDestroyChildTable(pTable);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_TYPE;
|
||||
}
|
||||
memcpy(pTable->schema, pWMsg->rowData + len, schemaSize);
|
||||
memcpy(pTable->schema, pRow->rowData + len, schemaSize);
|
||||
len += schemaSize;
|
||||
|
||||
if (pTable->sqlLen != 0) {
|
||||
|
@ -278,11 +278,11 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) {
|
|||
mnodeDestroyChildTable(pTable);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pTable->sql, pWMsg->rowData + len, pTable->sqlLen);
|
||||
memcpy(pTable->sql, pRow->rowData + len, pTable->sqlLen);
|
||||
}
|
||||
}
|
||||
|
||||
pWMsg->pRow = pTable;
|
||||
pRow->pObj = pTable;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId);
|
||||
if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId);
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
mnodeDecDbRef(pDb);
|
||||
|
@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
if (pVgroup == NULL) {
|
||||
mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid);
|
||||
pTable->vgId = 0;
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
|
||||
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid);
|
||||
pTable->vgId = 0;
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() {
|
|||
if (pSuperTable == NULL) {
|
||||
mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid);
|
||||
pTable->vgId = 0;
|
||||
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pRow = pTable, .pTable = tsChildTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
|
@ -430,13 +430,13 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) {
|
|||
tfree(pStable);
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pWMsg) {
|
||||
mnodeDestroySuperTable(pWMsg->pRow);
|
||||
static int32_t mnodeSuperTableActionDestroy(SSdbRow *pRow) {
|
||||
mnodeDestroySuperTable(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) {
|
||||
SSTableObj *pStable = pWMsg->pRow;
|
||||
static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
|
||||
SSTableObj *pStable = pRow->pObj;
|
||||
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
|
||||
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
|
||||
mnodeAddSuperTableIntoDb(pDb);
|
||||
|
@ -446,8 +446,8 @@ static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) {
|
||||
SSTableObj *pStable = pWMsg->pRow;
|
||||
static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) {
|
||||
SSTableObj *pStable = pRow->pObj;
|
||||
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
|
||||
if (pDb != NULL) {
|
||||
mnodeRemoveSuperTableFromDb(pDb);
|
||||
|
@ -458,8 +458,8 @@ static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SSTableObj *pNew = pWMsg->pRow;
|
||||
static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
|
||||
SSTableObj *pNew = pRow->pObj;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
|
||||
if (pTable != NULL && pTable != pNew) {
|
||||
void *oldTableId = pTable->info.tableId;
|
||||
|
@ -483,43 +483,43 @@ static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pWMsg) {
|
||||
SSTableObj *pStable = pWMsg->pRow;
|
||||
assert(pWMsg->pRow != NULL && pWMsg->rowData != NULL);
|
||||
static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) {
|
||||
SSTableObj *pStable = pRow->pObj;
|
||||
assert(pRow->pObj != NULL && pRow->rowData != NULL);
|
||||
|
||||
int32_t len = strlen(pStable->info.tableId);
|
||||
if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
|
||||
memcpy(pWMsg->rowData, pStable->info.tableId, len);
|
||||
memset(pWMsg->rowData + len, 0, 1);
|
||||
memcpy(pRow->rowData, pStable->info.tableId, len);
|
||||
memset(pRow->rowData + len, 0, 1);
|
||||
len++;
|
||||
|
||||
memcpy(pWMsg->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize);
|
||||
memcpy(pRow->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize);
|
||||
len += tsSuperTableUpdateSize;
|
||||
|
||||
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
|
||||
memcpy(pWMsg->rowData + len, pStable->schema, schemaSize);
|
||||
memcpy(pRow->rowData + len, pStable->schema, schemaSize);
|
||||
len += schemaSize;
|
||||
|
||||
pWMsg->rowSize = len;
|
||||
pRow->rowSize = len;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) {
|
||||
assert(pWMsg->rowData != NULL);
|
||||
static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
|
||||
assert(pRow->rowData != NULL);
|
||||
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj));
|
||||
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
int32_t len = strlen(pWMsg->rowData);
|
||||
int32_t len = strlen(pRow->rowData);
|
||||
if (len >= TSDB_TABLE_FNAME_LEN){
|
||||
free(pStable);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
}
|
||||
pStable->info.tableId = strdup(pWMsg->rowData);
|
||||
pStable->info.tableId = strdup(pRow->rowData);
|
||||
len++;
|
||||
|
||||
memcpy((char*)pStable + sizeof(char *), pWMsg->rowData + len, tsSuperTableUpdateSize);
|
||||
memcpy((char*)pStable + sizeof(char *), pRow->rowData + len, tsSuperTableUpdateSize);
|
||||
len += tsSuperTableUpdateSize;
|
||||
|
||||
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
|
||||
|
@ -529,9 +529,9 @@ static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_MND_NOT_SUPER_TABLE;
|
||||
}
|
||||
|
||||
memcpy(pStable->schema, pWMsg->rowData + len, schemaSize);
|
||||
memcpy(pStable->schema, pRow->rowData + len, schemaSize);
|
||||
|
||||
pWMsg->pRow = pStable;
|
||||
pRow->pObj = pStable;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
} else {
|
||||
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pTable, .pTable = tsSuperTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
}
|
||||
|
||||
|
@ -878,16 +878,16 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pMsg->pTable = (STableObj *)pStable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.rowSize = sizeof(SSTableObj) + schemaSize,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeCreateSuperTableCb
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&wmsg);
|
||||
int32_t code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mnodeDestroySuperTable(pStable);
|
||||
pMsg->pTable = NULL;
|
||||
|
@ -937,15 +937,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
mnodeDropAllChildTablesInStable(pStable);
|
||||
}
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeDropSuperTableCb
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
tstrerror(code));
|
||||
|
@ -1010,15 +1010,15 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
|
|||
mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
schema[0].name);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAddSuperTableTagCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
|
@ -1044,15 +1044,15 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeDropSuperTableTagCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
|
@ -1088,15 +1088,15 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
|
|||
mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
oldTagName, newTagName);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeModifySuperTableTagNameCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) {
|
||||
|
@ -1162,15 +1162,15 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAddSuperTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
|
@ -1207,15 +1207,15 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
|
|||
|
||||
mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeDropSuperTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
|
@ -1251,15 +1251,15 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
|
|||
mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
oldName, newName);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pStable,
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeChangeSuperTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
// show super tables
|
||||
|
@ -1417,12 +1417,12 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsSuperTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfTables ++;
|
||||
}
|
||||
|
||||
|
@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
} else {
|
||||
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code));
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pTable, .pTable = tsChildTableSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsChildTableSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
return code;
|
||||
}
|
||||
|
@ -1780,9 +1780,9 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
|
|||
pMsg->pTable = (STableObj *)pTable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSWriteMsg desc = {
|
||||
SSdbRow desc = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pMsg = pMsg,
|
||||
.fpReq = mnodeDoCreateChildTableFp
|
||||
|
@ -1901,15 +1901,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeDropChildTableCb
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
|
@ -2005,15 +2005,15 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
|
|||
|
||||
mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAlterNormalTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
||||
|
@ -2038,15 +2038,15 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
|
|||
|
||||
mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAlterNormalTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
|
||||
|
@ -2075,15 +2075,15 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
|
|||
mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
oldName, newName);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg,
|
||||
.fpRsp = mnodeAlterNormalTableColumnCb
|
||||
};
|
||||
|
||||
return sdbUpdateRow(&wmsg);
|
||||
return sdbUpdateRow(&row);
|
||||
}
|
||||
|
||||
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) {
|
||||
|
@ -2218,12 +2218,12 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (pTable->vgId == pVgroup->vgId) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfTables++;
|
||||
}
|
||||
mnodeDecTableRef(pTable);
|
||||
|
@ -2251,12 +2251,12 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfTables++;
|
||||
}
|
||||
mnodeDecTableRef(pTable);
|
||||
|
@ -2280,12 +2280,12 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
|
|||
if (pTable == NULL) break;
|
||||
|
||||
if (pTable->superTable == pStable) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfTables++;
|
||||
}
|
||||
|
||||
|
@ -2410,9 +2410,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
SSWriteMsg desc = {
|
||||
SSdbRow desc = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pRow = pTable,
|
||||
.pObj = pTable,
|
||||
.pTable = tsChildTableSdb,
|
||||
.pMsg = mnodeMsg,
|
||||
.fpRsp = mnodeDoCreateChildTableCb
|
||||
|
@ -2440,8 +2440,8 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
|
||||
tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry);
|
||||
|
||||
SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pRow = pTable};
|
||||
sdbDeleteRow(&wmsg);
|
||||
SSdbRow row = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable};
|
||||
sdbDeleteRow(&row);
|
||||
|
||||
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) {
|
||||
//Avoid retry again in client
|
||||
|
|
|
@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg);
|
|||
static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg);
|
||||
|
||||
static int32_t mnodeUserActionDestroy(SSWriteMsg *pWMsg) {
|
||||
tfree(pWMsg->pRow);
|
||||
static int32_t mnodeUserActionDestroy(SSdbRow *pRow) {
|
||||
tfree(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) {
|
||||
SUserObj *pUser = pWMsg->pRow;
|
||||
static int32_t mnodeUserActionInsert(SSdbRow *pRow) {
|
||||
SUserObj *pUser = pRow->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
|
||||
|
||||
if (pAcct != NULL) {
|
||||
|
@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) {
|
||||
SUserObj *pUser = pWMsg->pRow;
|
||||
static int32_t mnodeUserActionDelete(SSdbRow *pRow) {
|
||||
SUserObj *pUser = pRow->pObj;
|
||||
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
|
||||
|
||||
if (pAcct != NULL) {
|
||||
|
@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SUserObj *pUser = pWMsg->pRow;
|
||||
static int32_t mnodeUserActionUpdate(SSdbRow *pRow) {
|
||||
SUserObj *pUser = pRow->pObj;
|
||||
SUserObj *pSaved = mnodeGetUser(pUser->user);
|
||||
if (pUser != pSaved) {
|
||||
memcpy(pSaved, pUser, tsUserUpdateSize);
|
||||
|
@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionEncode(SSWriteMsg *pWMsg) {
|
||||
SUserObj *pUser = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pUser, tsUserUpdateSize);
|
||||
pWMsg->rowSize = tsUserUpdateSize;
|
||||
static int32_t mnodeUserActionEncode(SSdbRow *pRow) {
|
||||
SUserObj *pUser = pRow->pObj;
|
||||
memcpy(pRow->rowData, pUser, tsUserUpdateSize);
|
||||
pRow->rowSize = tsUserUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeUserActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeUserActionDecode(SSdbRow *pRow) {
|
||||
SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj));
|
||||
if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pUser, pWMsg->rowData, tsUserUpdateSize);
|
||||
pWMsg->pRow = pUser;
|
||||
memcpy(pUser, pRow->rowData, tsUserUpdateSize);
|
||||
pRow->pObj = pUser;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) {
|
|||
}
|
||||
|
||||
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsUserSdb,
|
||||
.pRow = pUser,
|
||||
.pObj = pUser,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&wmsg);
|
||||
int32_t code = sdbUpdateRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
|
||||
} else {
|
||||
|
@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
|||
pUser->superAuth = 1;
|
||||
}
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsUserSdb,
|
||||
.pRow = pUser,
|
||||
.pObj = pUser,
|
||||
.rowSize = sizeof(SUserObj),
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&wmsg);
|
||||
code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
|
||||
tfree(pUser);
|
||||
|
@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsUserSdb,
|
||||
.pRow = pUser,
|
||||
.pObj = pUser,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
|
||||
} else {
|
||||
|
@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
|
|||
if (pUser == NULL) break;
|
||||
|
||||
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsUserSdb,
|
||||
.pRow = pUser,
|
||||
.pObj = pUser,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfUsers++;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
|
|||
tfree(pVgroup);
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pWMsg) {
|
||||
mnodeDestroyVgroup(pWMsg->pRow);
|
||||
static int32_t mnodeVgroupActionDestroy(SSdbRow *pRow) {
|
||||
mnodeDestroyVgroup(pRow->pObj);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) {
|
||||
SVgObj *pVgroup = pWMsg->pRow;
|
||||
static int32_t mnodeVgroupActionInsert(SSdbRow *pRow) {
|
||||
SVgObj *pVgroup = pRow->pObj;
|
||||
|
||||
// refer to db
|
||||
SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
|
||||
|
@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) {
|
||||
SVgObj *pVgroup = pWMsg->pRow;
|
||||
static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) {
|
||||
SVgObj *pVgroup = pRow->pObj;
|
||||
|
||||
if (pVgroup->pDb == NULL) {
|
||||
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName);
|
||||
|
@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) {
|
||||
SVgObj *pNew = pWMsg->pRow;
|
||||
static int32_t mnodeVgroupActionUpdate(SSdbRow *pRow) {
|
||||
SVgObj *pNew = pRow->pObj;
|
||||
SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId);
|
||||
|
||||
if (pVgroup != pNew) {
|
||||
|
@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionEncode(SSWriteMsg *pWMsg) {
|
||||
SVgObj *pVgroup = pWMsg->pRow;
|
||||
memcpy(pWMsg->rowData, pVgroup, tsVgUpdateSize);
|
||||
SVgObj *pTmpVgroup = pWMsg->rowData;
|
||||
static int32_t mnodeVgroupActionEncode(SSdbRow *pRow) {
|
||||
SVgObj *pVgroup = pRow->pObj;
|
||||
memcpy(pRow->rowData, pVgroup, tsVgUpdateSize);
|
||||
SVgObj *pTmpVgroup = pRow->rowData;
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
pTmpVgroup->vnodeGid[i].pDnode = NULL;
|
||||
pTmpVgroup->vnodeGid[i].role = 0;
|
||||
}
|
||||
|
||||
pWMsg->rowSize = tsVgUpdateSize;
|
||||
pRow->rowSize = tsVgUpdateSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeVgroupActionDecode(SSWriteMsg *pWMsg) {
|
||||
static int32_t mnodeVgroupActionDecode(SSdbRow *pRow) {
|
||||
SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
|
||||
if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
memcpy(pVgroup, pWMsg->rowData, tsVgUpdateSize);
|
||||
pWMsg->pRow = pVgroup;
|
||||
memcpy(pVgroup, pRow->rowData, tsVgUpdateSize);
|
||||
pRow->pObj = pVgroup;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
|
|||
}
|
||||
|
||||
void mnodeUpdateVgroup(SVgObj *pVgroup) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup
|
||||
.pObj = pVgroup
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&wmsg);
|
||||
int32_t code = sdbUpdateRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
|
||||
}
|
||||
|
@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
tstrerror(code));
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
|
||||
sdbDeleteRow(&desc);
|
||||
return code;
|
||||
} else {
|
||||
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
pDb->name, pVgroup->numOfVnodes);
|
||||
pVgroup->status = TAOS_VG_STATUS_READY;
|
||||
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb};
|
||||
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
|
||||
(void)sdbUpdateRow(&desc);
|
||||
|
||||
dnodeReprocessMWriteMsg(pMsg);
|
||||
|
@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
|
||||
// pDb->name, pVgroup->numOfVnodes);
|
||||
// pVgroup->status = TAOS_VG_STATUS_READY;
|
||||
// SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pRow = pVgroup, .pTable = tsVgroupSdb};
|
||||
// SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
|
||||
// (void)sdbUpdateRow(&desc);
|
||||
// dnodeReprocessMWriteMsg(pMsg);
|
||||
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
|
@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
|
|||
pMsg->pVgroup = pVgroup;
|
||||
mnodeIncVgroupRef(pVgroup);
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup,
|
||||
.pObj = pVgroup,
|
||||
.rowSize = sizeof(SVgObj),
|
||||
.pMsg = pMsg,
|
||||
.fpReq = mnodeCreateVgroupFp
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&wmsg);
|
||||
code = sdbInsertRow(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
pMsg->pVgroup = NULL;
|
||||
mnodeDestroyVgroup(pVgroup);
|
||||
|
@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
|
|||
} else {
|
||||
mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
mnodeSendDropVgroupMsg(pVgroup, NULL);
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup
|
||||
.pObj = pVgroup
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||
|
||||
if (mnodeMsg->received == mnodeMsg->successed) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup,
|
||||
.pObj = pVgroup,
|
||||
.rowSize = sizeof(SVgObj),
|
||||
.pMsg = mnodeMsg,
|
||||
.fpRsp = mnodeCreateVgroupCb
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRowToQueue(&wmsg);
|
||||
int32_t code = sdbInsertRowToQueue(&row);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mnodeMsg->pVgroup = NULL;
|
||||
mnodeDestroyVgroup(pVgroup);
|
||||
dnodeSendRpcMWriteRsp(mnodeMsg, code);
|
||||
}
|
||||
} else {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup
|
||||
.pObj = pVgroup
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code);
|
||||
}
|
||||
}
|
||||
|
@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup
|
||||
.pObj = pVgroup
|
||||
};
|
||||
int32_t code = sdbDeleteRow(&wmsg);
|
||||
int32_t code = sdbDeleteRow(&row);
|
||||
if (code != 0) {
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
}
|
||||
|
@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
|
|||
|
||||
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
|
||||
mnodeDropAllChildTablesInVgroups(pVgroup);
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup,
|
||||
.pObj = pVgroup,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfVgroups++;
|
||||
}
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
|
@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
|
|||
if (pVgroup == NULL) break;
|
||||
|
||||
if (pVgroup->pDb == pDropDb) {
|
||||
SSWriteMsg wmsg = {
|
||||
SSdbRow row = {
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.pTable = tsVgroupSdb,
|
||||
.pRow = pVgroup,
|
||||
.pObj = pVgroup,
|
||||
};
|
||||
sdbDeleteRow(&wmsg);
|
||||
sdbDeleteRow(&row);
|
||||
numOfVgroups++;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue