|
|
|
@ -34,7 +34,7 @@
|
|
|
|
|
#include "mnodeSdb.h"
|
|
|
|
|
|
|
|
|
|
#define SDB_TABLE_LEN 12
|
|
|
|
|
#define SDB_SYNC_HACK 16
|
|
|
|
|
#define MAX_QUEUED_MSG_NUM 10000
|
|
|
|
|
|
|
|
|
|
typedef enum {
|
|
|
|
|
SDB_ACTION_INSERT = 0,
|
|
|
|
@ -82,6 +82,7 @@ typedef struct {
|
|
|
|
|
int64_t sync;
|
|
|
|
|
void * wal;
|
|
|
|
|
SSyncCfg cfg;
|
|
|
|
|
int32_t queuedMsg;
|
|
|
|
|
int32_t numOfTables;
|
|
|
|
|
SSdbTable *tableList[SDB_TABLE_MAX];
|
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
@ -105,16 +106,14 @@ static taos_qall tsSdbWQall;
|
|
|
|
|
static taos_queue tsSdbWQueue;
|
|
|
|
|
static SSdbWorkerPool tsSdbPool;
|
|
|
|
|
|
|
|
|
|
static int32_t sdbWrite(void *pWrite, void *pHead, int32_t qtype, void *unused);
|
|
|
|
|
static int32_t sdbWriteToQueue(void *pWrite, void *pHead, int32_t qtype, void *unused);
|
|
|
|
|
static int32_t sdbProcessWrite(void *pWrite, void *pHead, int32_t qtype, void *unused);
|
|
|
|
|
static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam);
|
|
|
|
|
static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action);
|
|
|
|
|
static void * sdbWorkerFp(void *pWorker);
|
|
|
|
|
static int32_t sdbInitWorker();
|
|
|
|
|
static void sdbCleanupWorker();
|
|
|
|
|
static int32_t sdbAllocQueue();
|
|
|
|
|
static void sdbFreeQueue();
|
|
|
|
|
extern int32_t sdbInsertRowImp(SSWriteMsg *pWrite);
|
|
|
|
|
static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite);
|
|
|
|
|
static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite);
|
|
|
|
|
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
|
|
|
|
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
|
|
|
|
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite);
|
|
|
|
@ -181,7 +180,7 @@ static int32_t sdbInitWal() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbInfo("vgId:1, open wal for restore");
|
|
|
|
|
int code = walRestore(tsSdbMgmt.wal, NULL, sdbWrite);
|
|
|
|
|
int32_t code = walRestore(tsSdbMgmt.wal, NULL, sdbProcessWrite);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
sdbError("vgId:1, failed to open wal for restore since %s", tstrerror(code));
|
|
|
|
|
return -1;
|
|
|
|
@ -250,7 +249,7 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
|
|
|
|
|
|
|
|
|
|
// failed to forward, need revert insert
|
|
|
|
|
static void sdbHandleFailedConfirm(SSWriteMsg *pWrite) {
|
|
|
|
|
SWalHead *pHead = (SWalHead *)((char *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
|
|
|
|
|
SWalHead *pHead = pWrite->pHead;
|
|
|
|
|
int32_t action = pHead->msgType % 10;
|
|
|
|
|
|
|
|
|
|
sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow,
|
|
|
|
@ -285,13 +284,6 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dnodeSendRpcMWriteRsp(pMsg, pWrite->code);
|
|
|
|
|
|
|
|
|
|
// if ahandle, means this func is called by sdb write
|
|
|
|
|
if (ahandle == NULL) {
|
|
|
|
|
sdbDecRef(pWrite->pTable, pWrite->pRow);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosFreeQitem(pWrite);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
|
|
|
|
@ -379,7 +371,7 @@ void sdbUpdateSync(void *pMnodes) {
|
|
|
|
|
syncInfo.ahandle = NULL;
|
|
|
|
|
syncInfo.getWalInfo = sdbGetWalInfo;
|
|
|
|
|
syncInfo.getFileInfo = sdbGetFileInfo;
|
|
|
|
|
syncInfo.writeToCache = sdbWriteToQueue;
|
|
|
|
|
syncInfo.writeToCache = sdbWriteWalToQueue;
|
|
|
|
|
syncInfo.confirmForward = sdbConfirmForward;
|
|
|
|
|
syncInfo.notifyRole = sdbNotifyRole;
|
|
|
|
|
tsSdbMgmt.cfg = syncCfg;
|
|
|
|
@ -389,6 +381,7 @@ void sdbUpdateSync(void *pMnodes) {
|
|
|
|
|
} else {
|
|
|
|
|
tsSdbMgmt.sync = syncStart(&syncInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbUpdateMnodeRoles();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -565,7 +558,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
|
|
|
|
|
static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
|
|
|
|
|
SSWriteMsg *pWrite = wparam;
|
|
|
|
|
SWalHead *pHead = hparam;
|
|
|
|
|
int32_t tableId = pHead->msgType / 10;
|
|
|
|
@ -665,8 +658,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
|
|
|
|
|
|
if (sdbGetRowFromObj(pTable, pWrite->pRow)) {
|
|
|
|
|
sdbError("vgId:1, sdb:%s, failed to insert key:%s since it already exist", pTable->name,
|
|
|
|
|
sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
sdbDecRef(pTable, pWrite->pRow);
|
|
|
|
|
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
|
|
|
|
|
}
|
|
|
|
@ -675,14 +667,14 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
|
|
|
|
|
*((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
|
|
|
|
|
|
|
|
|
// let vgId increase from 2
|
|
|
|
|
if (pTable->autoIndex == 1 && strcmp(pTable->name, "vgroups") == 0) {
|
|
|
|
|
if (pTable->autoIndex == 1 && pTable->id == SDB_TABLE_VGROUP) {
|
|
|
|
|
*((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = sdbInsertHash(pTable, pWrite);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
sdbError("vgId:1, sdb:%s, failed to insert into hash", pTable->name);
|
|
|
|
|
sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -694,39 +686,10 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
|
|
|
|
|
if (pWrite->fpReq) {
|
|
|
|
|
return (*pWrite->fpReq)(pWrite->pMsg);
|
|
|
|
|
} else {
|
|
|
|
|
return sdbInsertRowImp(pWrite);
|
|
|
|
|
return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
|
|
|
|
|
SSdbTable *pTable = pWrite->pTable;
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
|
|
|
|
SSWriteMsg *pNewWrite = taosAllocateQitem(size);
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = (SWalHead *)((char *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
|
|
|
|
|
pHead->version = 0;
|
|
|
|
|
pHead->len = pWrite->rowSize;
|
|
|
|
|
pHead->msgType = pTable->id * 10 + SDB_ACTION_INSERT;
|
|
|
|
|
|
|
|
|
|
pWrite->rowData = pHead->cont;
|
|
|
|
|
(*pTable->fpEncode)(pWrite);
|
|
|
|
|
pHead->len = pWrite->rowSize;
|
|
|
|
|
|
|
|
|
|
memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
|
|
|
|
|
|
|
|
|
|
if (pNewWrite->pMsg != NULL) {
|
|
|
|
|
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, insert action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbIncRef(pNewWrite->pTable, pNewWrite->pRow);
|
|
|
|
|
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool sdbCheckRowDeleted(void *tparam, void *pRow) {
|
|
|
|
|
SSdbTable *pTable = tparam;
|
|
|
|
|
if (pTable == NULL) return false;
|
|
|
|
@ -745,55 +708,24 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
|
|
|
|
|
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbIncRef(pTable, pWrite->pRow);
|
|
|
|
|
|
|
|
|
|
int32_t code = sdbDeleteHash(pTable, pWrite);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->name);
|
|
|
|
|
sdbDecRef(pTable, pWrite->pRow);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// just delete data from memory
|
|
|
|
|
if (pWrite->type != SDB_OPER_GLOBAL) {
|
|
|
|
|
sdbDecRef(pTable, pWrite->pRow);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pWrite->fpReq) {
|
|
|
|
|
return (*pWrite->fpReq)(pWrite->pMsg);
|
|
|
|
|
} else {
|
|
|
|
|
return sdbDeleteRowImp(pWrite);
|
|
|
|
|
return sdbWriteRowToQueue(pWrite, SDB_ACTION_DELETE);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
|
|
|
|
|
SSdbTable *pTable = pWrite->pTable;
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
|
|
|
|
SSWriteMsg *pNewWrite = taosAllocateQitem(size);
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
|
|
|
|
|
pHead->version = 0;
|
|
|
|
|
pHead->msgType = pTable->id * 10 + SDB_ACTION_DELETE;
|
|
|
|
|
|
|
|
|
|
pWrite->rowData = pHead->cont;
|
|
|
|
|
(*pTable->fpEncode)(pWrite);
|
|
|
|
|
pHead->len = pWrite->rowSize;
|
|
|
|
|
|
|
|
|
|
memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
|
|
|
|
|
|
|
|
|
|
if (pNewWrite->pMsg != NULL) {
|
|
|
|
|
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, delete action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
|
|
|
|
|
SSdbTable *pTable = pWrite->pTable;
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
@ -818,38 +750,10 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
|
|
|
|
|
if (pWrite->fpReq) {
|
|
|
|
|
return (*pWrite->fpReq)(pWrite->pMsg);
|
|
|
|
|
} else {
|
|
|
|
|
return sdbUpdateRowImp(pWrite);
|
|
|
|
|
return sdbWriteRowToQueue(pWrite, SDB_ACTION_UPDATE);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
|
|
|
|
|
SSdbTable *pTable = pWrite->pTable;
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
|
|
|
|
SSWriteMsg *pNewWrite = taosAllocateQitem(size);
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
|
|
|
|
|
pHead->version = 0;
|
|
|
|
|
pHead->msgType = pTable->id * 10 + SDB_ACTION_UPDATE;
|
|
|
|
|
|
|
|
|
|
pWrite->rowData = pHead->cont;
|
|
|
|
|
(*pTable->fpEncode)(pWrite);
|
|
|
|
|
pHead->len = pWrite->rowSize;
|
|
|
|
|
|
|
|
|
|
memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
|
|
|
|
|
|
|
|
|
|
if (pNewWrite->pMsg != NULL) {
|
|
|
|
|
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, update action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
|
|
|
|
|
pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbIncRef(pNewWrite->pTable, pNewWrite->pRow);
|
|
|
|
|
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *sdbFetchRow(void *tparam, void *pNode, void **ppRow) {
|
|
|
|
|
SSdbTable *pTable = tparam;
|
|
|
|
|
*ppRow = NULL;
|
|
|
|
@ -942,7 +846,7 @@ void sdbCloseTable(void *handle) {
|
|
|
|
|
free(pTable);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbInitWorker() {
|
|
|
|
|
static int32_t sdbInitWorker() {
|
|
|
|
|
tsSdbPool.num = 1;
|
|
|
|
|
tsSdbPool.worker = calloc(sizeof(SSdbWorker), tsSdbPool.num);
|
|
|
|
|
|
|
|
|
@ -958,7 +862,7 @@ int32_t sdbInitWorker() {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void sdbCleanupWorker() {
|
|
|
|
|
static void sdbCleanupWorker() {
|
|
|
|
|
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
|
|
|
|
SSdbWorker *pWorker = tsSdbPool.worker + i;
|
|
|
|
|
if (pWorker->thread) {
|
|
|
|
@ -979,7 +883,7 @@ void sdbCleanupWorker() {
|
|
|
|
|
mInfo("vgId:1, sdb write is closed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbAllocQueue() {
|
|
|
|
|
static int32_t sdbAllocQueue() {
|
|
|
|
|
tsSdbWQueue = taosOpenQueue();
|
|
|
|
|
if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
|
@ -1021,7 +925,7 @@ int32_t sdbAllocQueue() {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void sdbFreeQueue() {
|
|
|
|
|
static void sdbFreeQueue() {
|
|
|
|
|
taosCloseQueue(tsSdbWQueue);
|
|
|
|
|
taosFreeQall(tsSdbWQall);
|
|
|
|
|
taosCloseQset(tsSdbWQset);
|
|
|
|
@ -1030,54 +934,96 @@ void sdbFreeQueue() {
|
|
|
|
|
tsSdbWQueue = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbWriteToQueue(void *wparam, void *hparam, int32_t qtype, void *unsed) {
|
|
|
|
|
SWalHead *pHead = hparam;
|
|
|
|
|
int32_t size = sizeof(SWalHead) + pHead->len;
|
|
|
|
|
SWalHead *pWal = taosAllocateQitem(size);
|
|
|
|
|
memcpy(pWal, pHead, size);
|
|
|
|
|
static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) {
|
|
|
|
|
SWalHead *pHead = pWrite->pHead;
|
|
|
|
|
|
|
|
|
|
taosWriteQitem(tsSdbWQueue, qtype, pWal);
|
|
|
|
|
return 0;
|
|
|
|
|
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
|
|
|
|
sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version);
|
|
|
|
|
taosFreeQitem(pWrite);
|
|
|
|
|
return TSDB_CODE_WAL_SIZE_LIMIT;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t queued = atomic_add_fetch_32(&tsSdbMgmt.queuedMsg, 1);
|
|
|
|
|
if (queued > MAX_QUEUED_MSG_NUM) {
|
|
|
|
|
sdbDebug("vgId:1, too many msg:%d in sdb queue, flow control", queued);
|
|
|
|
|
taosMsleep(1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbIncRef(pWrite->pTable, pWrite->pRow);
|
|
|
|
|
|
|
|
|
|
sdbTrace("vgId:1, msg:%p write into to sdb queue", pWrite->pMsg);
|
|
|
|
|
taosWriteQitem(tsSdbWQueue, qtype, pWrite);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void sdbFreeFromQueue(SSWriteMsg *pWrite) {
|
|
|
|
|
int32_t queued = atomic_sub_fetch_32(&tsSdbMgmt.queuedMsg, 1);
|
|
|
|
|
sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pWrite->pMsg, queued);
|
|
|
|
|
|
|
|
|
|
sdbDecRef(pWrite->pTable, pWrite->pRow);
|
|
|
|
|
taosFreeQitem(pWrite);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
|
|
|
|
SWalHead *pHead = wparam;
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pHead->len;
|
|
|
|
|
SSWriteMsg *pWrite = taosAllocateQitem(size);
|
|
|
|
|
if (pWrite == NULL) {
|
|
|
|
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return sdbWriteToQueue(pWrite, qtype);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action) {
|
|
|
|
|
SSdbTable *pTable = pInputWrite->pTable;
|
|
|
|
|
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize;
|
|
|
|
|
SSWriteMsg *pWrite = taosAllocateQitem(size);
|
|
|
|
|
if (pWrite == NULL) {
|
|
|
|
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memcpy(pWrite, pInputWrite, sizeof(SSWriteMsg));
|
|
|
|
|
pWrite->processedCount = 1;
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = pWrite->pHead;
|
|
|
|
|
pWrite->rowData = pHead->cont;
|
|
|
|
|
(*pTable->fpEncode)(pWrite);
|
|
|
|
|
|
|
|
|
|
pHead->len = pWrite->rowSize;
|
|
|
|
|
pHead->version = 0;
|
|
|
|
|
pHead->msgType = pTable->id * 10 + action;
|
|
|
|
|
|
|
|
|
|
return sdbWriteToQueue(pWrite, TAOS_QTYPE_RPC);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite) { return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); }
|
|
|
|
|
|
|
|
|
|
static void *sdbWorkerFp(void *pWorker) {
|
|
|
|
|
SWalHead *pHead;
|
|
|
|
|
SSWriteMsg *pWrite;
|
|
|
|
|
int32_t qtype;
|
|
|
|
|
int32_t numOfMsgs;
|
|
|
|
|
void * item;
|
|
|
|
|
void * unUsed;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
|
|
|
|
|
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
|
|
|
|
|
if (numOfMsgs == 0) {
|
|
|
|
|
sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWQset);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
|
|
|
|
taosGetQitem(tsSdbWQall, &qtype, &item);
|
|
|
|
|
if (qtype == TAOS_QTYPE_RPC) {
|
|
|
|
|
pWrite = (SSWriteMsg *)item;
|
|
|
|
|
pWrite->processedCount = 1;
|
|
|
|
|
pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
|
|
|
|
|
if (pWrite->pMsg != NULL) {
|
|
|
|
|
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s hver:%" PRIu64 ", will be processed in sdb queue",
|
|
|
|
|
pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->name, pWrite->pRow,
|
|
|
|
|
sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
pHead = (SWalHead *)item;
|
|
|
|
|
pWrite = NULL;
|
|
|
|
|
}
|
|
|
|
|
taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite);
|
|
|
|
|
sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pWrite->pMsg, pWrite->pRow,
|
|
|
|
|
pWrite->pHead->version);
|
|
|
|
|
|
|
|
|
|
int32_t code = sdbWrite(pWrite, pHead, qtype, NULL);
|
|
|
|
|
if (code > 0) code = 0;
|
|
|
|
|
if (pWrite) {
|
|
|
|
|
pWrite->code = code;
|
|
|
|
|
} else {
|
|
|
|
|
pHead->len = code; // hackway
|
|
|
|
|
}
|
|
|
|
|
pWrite->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pWrite : NULL, pWrite->pHead, qtype, NULL);
|
|
|
|
|
if (pWrite->code > 0) pWrite->code = 0;
|
|
|
|
|
|
|
|
|
|
sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pWrite->pMsg, pWrite->code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
walFsync(tsSdbMgmt.wal, true);
|
|
|
|
@ -1085,18 +1031,16 @@ static void *sdbWorkerFp(void *pWorker) {
|
|
|
|
|
// browse all items, and process them one by one
|
|
|
|
|
taosResetQitems(tsSdbWQall);
|
|
|
|
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
|
|
|
|
taosGetQitem(tsSdbWQall, &qtype, &item);
|
|
|
|
|
taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite);
|
|
|
|
|
|
|
|
|
|
if (qtype == TAOS_QTYPE_RPC) {
|
|
|
|
|
pWrite = (SSWriteMsg *)item;
|
|
|
|
|
sdbConfirmForward(NULL, pWrite, pWrite->code);
|
|
|
|
|
} else if (qtype == TAOS_QTYPE_FWD) {
|
|
|
|
|
pHead = (SWalHead *)item;
|
|
|
|
|
syncConfirmForward(tsSdbMgmt.sync, pHead->version, pHead->len);
|
|
|
|
|
taosFreeQitem(item);
|
|
|
|
|
syncConfirmForward(tsSdbMgmt.sync, pWrite->pHead->version, pWrite->code);
|
|
|
|
|
} else {
|
|
|
|
|
taosFreeQitem(item);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sdbFreeFromQueue(pWrite);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|