diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 7169e14f03..fb23e252c0 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -52,11 +52,11 @@ typedef enum { typedef struct SSWriteMsg { ESdbOper type; int32_t processedCount; // for sync fwd callback - int32_t retCode; // for callback in sdb queue + int32_t code; // for callback in sdb queue int32_t rowSize; void * rowData; int32_t (*fpReq)(SMnodeMsg *pMsg); - int32_t (*fpWrite)(SMnodeMsg *pMsg, int32_t code); + int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); void * pRow; SMnodeMsg *pMsg; struct SSdbTable *pTable; diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 9696df155a..6590ff0490 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -418,7 +418,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * .pRow = pDb, .rowSize = sizeof(SDbObj), .pMsg = pMsg, - .fpWrite = mnodeCreateDbCb + .fpRsp = mnodeCreateDbCb }; code = sdbInsertRow(&wmsg); @@ -1024,7 +1024,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { .pTable = tsDbSdb, .pRow = pDb, .pMsg = pMsg, - .fpWrite = mnodeAlterDbCb + .fpRsp = mnodeAlterDbCb }; code = sdbUpdateRow(&wmsg); @@ -1076,7 +1076,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { .pTable = tsDbSdb, .pRow = pDb, .pMsg = pMsg, - .fpWrite = mnodeDropDbCb + .fpRsp = mnodeDropDbCb }; int32_t code = sdbDeleteRow(&wmsg); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 33925960be..20e70c1af7 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -329,7 +329,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { .type = SDB_OPER_GLOBAL, .pTable = tsMnodeSdb, .pRow = pMnode, - .fpWrite = mnodeCreateMnodeCb + .fpRsp = mnodeCreateMnodeCb }; int32_t code = TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8056833cdb..850da1f0c3 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -48,6 +48,13 @@ typedef enum { SDB_STATUS_CLOSING = 2 } ESdbStatus; +char *actStr[] = { + "insert", + "delete", + "update", + "invalid" +}; + typedef struct SSdbTable { char tableName[SDB_TABLE_LEN]; ESdbTable tableId; @@ -140,18 +147,6 @@ static void *sdbGetObjKey(SSdbTable *pTable, void *key) { return key; } -static char *sdbGetActionStr(int32_t action) { - switch (action) { - case SDB_ACTION_INSERT: - return "insert"; - case SDB_ACTION_DELETE: - return "delete"; - case SDB_ACTION_UPDATE: - return "update"; - } - return "invalid"; -} - static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { static char str[16]; switch (pTable->keyType) { @@ -167,7 +162,7 @@ static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { } } -static char *sdbGetObjStr(SSdbTable *pTable, void *key) { +static char *sdbGetRowStr(SSdbTable *pTable, void *key) { return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key)); } @@ -254,33 +249,28 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { } FORCE_INLINE -static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - assert(param); - SSWriteMsg * pWrite = param; - SMnodeMsg *pMsg = pWrite->pMsg; - if (code <= 0) pWrite->retCode = code; +static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { + if (wparam == NULL) return; + SSWriteMsg *pWrite = wparam; + SMnodeMsg * pMsg = pWrite->pMsg; - int32_t processedCount = atomic_add_fetch_32(&pWrite->processedCount, 1); - if (processedCount <= 1) { - if (pMsg != NULL) { - sdbDebug("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, processedCount, code); - } + if (code <= 0) pWrite->code = code; + int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); + if (count <= 1) { + if (pMsg != NULL) sdbTrace("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, count, code); return; - } - - if (pMsg != NULL) { - sdbDebug("vgId:1, msg:%p is confirmed, code:%x", pMsg, code); + } else { + if (pMsg != NULL) sdbTrace("vgId:1, msg:%p is confirmed, code:%x", pMsg, code); } // failed to forward, need revert insert - if (pWrite->retCode != TSDB_CODE_SUCCESS) { - SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; + if (pWrite->code != TSDB_CODE_SUCCESS) { + SWalHead *pHead = (SWalHead *)((char *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); int32_t action = pHead->msgType % 10; - sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pRow, - sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode)); + sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow, + sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pWrite->code)); if (action == SDB_ACTION_INSERT) { // It's better to create a table in two stages, create it first and then set it success - //sdbDeleteHash(pWrite->pTable, pWrite); SSWriteMsg wmsg = { .type = SDB_OPER_GLOBAL, .pTable = pWrite->pTable, @@ -290,10 +280,10 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { } } - if (pWrite->fpWrite != NULL) { - pWrite->retCode = (*pWrite->fpWrite)(pMsg, pWrite->retCode); + if (pWrite->fpRsp != NULL) { + pWrite->code = (*pWrite->fpRsp)(pMsg, pWrite->code); } - dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode); + dnodeSendRpcMWriteRsp(pMsg, pWrite->code); // if ahandle, means this func is called by sdb write if (ahandle == NULL) { @@ -449,7 +439,7 @@ void sdbIncRef(void *tparam, void *pRow) { SSdbTable *pTable = tparam; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_add_fetch_32(pRefCount, 1); - sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); + sdbTrace("vgId:1, sdb:%s, inc ref to row:%p:%s:%d", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount); } void sdbDecRef(void *tparam, void *pRow) { @@ -458,11 +448,11 @@ void sdbDecRef(void *tparam, void *pRow) { SSdbTable *pTable = tparam; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); + sdbTrace("vgId:1, sdb:%s, dec ref to row:%p:%s:%d", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount); int32_t *updateEnd = pRow + pTable->refCountPos - 4; if (refCount <= 0 && *updateEnd) { - sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); + sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount); SSWriteMsg wmsg = {.pRow = pRow}; (*pTable->fpDestroy)(&wmsg); } @@ -523,12 +513,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) { } sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg); int32_t code = (*pTable->fpInsert)(pWrite); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow)); + sdbGetRowStr(pTable, pWrite->pRow)); sdbDeleteHash(pTable, pWrite); } @@ -540,7 +530,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; if (!set) { sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow)); + sdbGetRowStr(pTable, pWrite->pRow)); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } @@ -559,7 +549,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { atomic_sub_fetch_32(&pTable->numOfRows, 1); sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); sdbDecRef(pTable, pWrite->pRow); @@ -568,7 +558,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) { sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); + sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); (*pTable->fpUpdate)(pWrite); return TSDB_CODE_SUCCESS; @@ -594,12 +584,12 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { if (pHead->version <= tsSdbMgmt.version) { pthread_mutex_unlock(&tsSdbMgmt.mutex); sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); + pTable->tableName, actStr[action], sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbMgmt.version + 1) { pthread_mutex_unlock(&tsSdbMgmt.mutex); sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); + pTable->tableName, actStr[action], sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); return TSDB_CODE_SYN_INVALID_VERSION; } else { tsSdbMgmt.version = pHead->version; @@ -623,19 +613,19 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { if (syncCode < 0) { sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); } else if (syncCode > 0) { sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); } else { sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); + actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); } return syncCode; } sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC); @@ -675,7 +665,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { if (sdbGetRowFromObj(pTable, pWrite->pRow)) { sdbError("vgId:1, sdb:%s, failed to insert key:%s, already exist", pTable->tableName, - sdbGetObjStr(pTable, pWrite->pRow)); + sdbGetRowStr(pTable, pWrite->pRow)); sdbDecRef(pTable, pWrite->pRow); return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; } @@ -712,9 +702,9 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewOper = taosAllocateQitem(size); + SSWriteMsg *pNewWrite = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; + SWalHead *pHead = (SWalHead *)((char *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); pHead->version = 0; pHead->len = pWrite->rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; @@ -723,15 +713,15 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { (*pTable->fpEncode)(pWrite); pHead->len = pWrite->rowSize; - memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); + memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - if (pNewOper->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); + if (pNewWrite->pMsg != NULL) { + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, insert action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, + pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); } - sdbIncRef(pNewOper->pTable, pNewOper->pRow); - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); + sdbIncRef(pNewWrite->pTable, pNewWrite->pRow); + taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -781,9 +771,9 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewOper = taosAllocateQitem(size); + SSWriteMsg *pNewWrite = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; + SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; @@ -791,14 +781,14 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { (*pTable->fpEncode)(pWrite); pHead->len = pWrite->rowSize; - memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); + memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - if (pNewOper->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); + if (pNewWrite->pMsg != NULL) { + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, delete action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, + pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); } - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); + taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -836,9 +826,9 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewOper = taosAllocateQitem(size); + SSWriteMsg *pNewWrite = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; + SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; @@ -846,15 +836,15 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { (*pTable->fpEncode)(pWrite); pHead->len = pWrite->rowSize; - memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); + memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - if (pNewOper->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); + if (pNewWrite->pMsg != NULL) { + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, update action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, + pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); } - sdbIncRef(pNewOper->pTable, pNewOper->pRow); - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); + sdbIncRef(pNewWrite->pTable, pNewWrite->pRow); + taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1071,7 +1061,7 @@ static void *sdbWorkerFp(void *pWorker) { pWrite->processedCount = 1; pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; if (pWrite->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->tableName, pWrite->pRow, sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version); } @@ -1083,7 +1073,7 @@ static void *sdbWorkerFp(void *pWorker) { int32_t code = sdbWrite(pWrite, pHead, qtype, NULL); if (code > 0) code = 0; if (pWrite) { - pWrite->retCode = code; + pWrite->code = code; } else { pHead->len = code; // hackway } @@ -1098,7 +1088,7 @@ static void *sdbWorkerFp(void *pWorker) { if (qtype == TAOS_QTYPE_RPC) { pWrite = (SSWriteMsg *)item; - sdbConfirmForward(NULL, pWrite, pWrite->retCode); + sdbConfirmForward(NULL, pWrite, pWrite->code); } else if (qtype == TAOS_QTYPE_FWD) { pHead = (SWalHead *)item; syncConfirmForward(tsSdbMgmt.sync, pHead->version, pHead->len); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index fc441bf524..ce9f652330 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -879,12 +879,12 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { mnodeIncTableRef(pMsg->pTable); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, .rowSize = sizeof(SSTableObj) + schemaSize, - .pMsg = pMsg, - .fpWrite = mnodeCreateSuperTableCb + .pMsg = pMsg, + .fpRsp = mnodeCreateSuperTableCb }; int32_t code = sdbInsertRow(&wmsg); @@ -942,7 +942,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { .pTable = tsSuperTableSdb, .pRow = pStable, .pMsg = pMsg, - .fpWrite = mnodeDropSuperTableCb + .fpRsp = mnodeDropSuperTableCb }; int32_t code = sdbDeleteRow(&wmsg); @@ -1011,11 +1011,11 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t schema[0].name); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeAddSuperTableTagCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeAddSuperTableTagCb }; return sdbUpdateRow(&wmsg); @@ -1045,11 +1045,11 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeDropSuperTableTagCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeDropSuperTableTagCb }; return sdbUpdateRow(&wmsg); @@ -1089,11 +1089,11 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c oldTagName, newTagName); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeModifySuperTableTagNameCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeModifySuperTableTagNameCb }; return sdbUpdateRow(&wmsg); @@ -1163,11 +1163,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeAddSuperTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeAddSuperTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -1208,11 +1208,11 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeDropSuperTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeDropSuperTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -1252,11 +1252,11 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char oldName, newName); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsSuperTableSdb, - .pRow = pStable, - .pMsg = pMsg, - .fpWrite = mnodeChangeSuperTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsSuperTableSdb, + .pRow = pStable, + .pMsg = pMsg, + .fpRsp = mnodeChangeSuperTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -1902,11 +1902,11 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { } SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsChildTableSdb, - .pRow = pTable, - .pMsg = pMsg, - .fpWrite = mnodeDropChildTableCb + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pRow = pTable, + .pMsg = pMsg, + .fpRsp = mnodeDropChildTableCb }; int32_t code = sdbDeleteRow(&wmsg); @@ -2006,11 +2006,11 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsChildTableSdb, - .pRow = pTable, - .pMsg = pMsg, - .fpWrite = mnodeAlterNormalTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pRow = pTable, + .pMsg = pMsg, + .fpRsp = mnodeAlterNormalTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -2039,11 +2039,11 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsChildTableSdb, - .pRow = pTable, - .pMsg = pMsg, - .fpWrite = mnodeAlterNormalTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pRow = pTable, + .pMsg = pMsg, + .fpRsp = mnodeAlterNormalTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -2076,11 +2076,11 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char oldName, newName); SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsChildTableSdb, - .pRow = pTable, - .pMsg = pMsg, - .fpWrite = mnodeAlterNormalTableColumnCb + .type = SDB_OPER_GLOBAL, + .pTable = tsChildTableSdb, + .pRow = pTable, + .pMsg = pMsg, + .fpRsp = mnodeAlterNormalTableColumnCb }; return sdbUpdateRow(&wmsg); @@ -2411,11 +2411,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { SSWriteMsg desc = { - .type = SDB_OPER_GLOBAL, - .pRow = pTable, - .pTable = tsChildTableSdb, - .pMsg = mnodeMsg, - .fpWrite = mnodeDoCreateChildTableCb + .type = SDB_OPER_GLOBAL, + .pRow = pTable, + .pTable = tsChildTableSdb, + .pMsg = mnodeMsg, + .fpRsp = mnodeDoCreateChildTableCb }; int32_t code = sdbInsertRowImp(&desc); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 9da96a8acf..c12d2057f8 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -958,12 +958,12 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received == mnodeMsg->successed) { SSWriteMsg wmsg = { - .type = SDB_OPER_GLOBAL, - .pTable = tsVgroupSdb, - .pRow = pVgroup, - .rowSize = sizeof(SVgObj), - .pMsg = mnodeMsg, - .fpWrite = mnodeCreateVgroupCb + .type = SDB_OPER_GLOBAL, + .pTable = tsVgroupSdb, + .pRow = pVgroup, + .rowSize = sizeof(SVgObj), + .pMsg = mnodeMsg, + .fpRsp = mnodeCreateVgroupCb }; int32_t code = sdbInsertRowImp(&wmsg);