commit
bd934402e7
|
@ -237,13 +237,18 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
pHead->msgType = pWrite->rpcMsg.msgType;
|
pHead->msgType = pWrite->rpcMsg.msgType;
|
||||||
pHead->version = 0;
|
pHead->version = 0;
|
||||||
pHead->len = pWrite->contLen;
|
pHead->len = pWrite->contLen;
|
||||||
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
|
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
|
||||||
|
taosMsg[pWrite->rpcMsg.msgType]);
|
||||||
} else {
|
} else {
|
||||||
pHead = (SWalHead *)item;
|
pHead = (SWalHead *)item;
|
||||||
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version);
|
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||||
|
pHead->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
||||||
|
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
|
||||||
|
pHead->version, tstrerror(code));
|
||||||
|
|
||||||
if (pWrite) {
|
if (pWrite) {
|
||||||
pWrite->rpcMsg.code = code;
|
pWrite->rpcMsg.code = code;
|
||||||
if (code <= 0) pWrite->processedCount = 1;
|
if (code <= 0) pWrite->processedCount = 1;
|
||||||
|
|
|
@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores
|
||||||
// sync
|
// sync
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||||
|
|
|
@ -594,7 +594,7 @@ static int sdbWrite(void *param, void *data, int type) {
|
||||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||||
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
|
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
|
||||||
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
||||||
return TSDB_CODE_MND_APP_ERROR;
|
return TSDB_CODE_SYN_INVALID_VERSION;
|
||||||
} else {
|
} else {
|
||||||
tsSdbObj.version = pHead->version;
|
tsSdbObj.version = pHead->version;
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,7 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
||||||
|
|
||||||
// always update version
|
// always update version
|
||||||
nodeVersion = pWalHead->version;
|
nodeVersion = pWalHead->version;
|
||||||
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype);
|
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
|
||||||
|
qtype, pWalHead->version);
|
||||||
|
|
||||||
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
||||||
|
|
||||||
|
@ -883,7 +884,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode * pNode = pPeer->pSyncNode;
|
||||||
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
||||||
|
|
||||||
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
|
sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
|
||||||
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
|
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
|
||||||
|
|
||||||
pPeer->version = pPeersStatus->version;
|
pPeer->version = pPeersStatus->version;
|
||||||
|
@ -970,7 +971,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
||||||
|
|
||||||
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
||||||
if (retLen == statusMsgLen) {
|
if (retLen == statusMsgLen) {
|
||||||
sDebug("%s, status msg is sent", pPeer->id);
|
sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
|
||||||
|
pPeersStatus->version, pPeersStatus->ack);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
|
|
@ -54,13 +54,13 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
|
|
||||||
// TODO: Later, let slave to support query
|
// TODO: Later, let slave to support query
|
||||||
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
|
||||||
|
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
|
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
|
||||||
if (qhandle == NULL || *qhandle == NULL) {
|
if (qhandle == NULL || *qhandle == NULL) {
|
||||||
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
} else {
|
} else {
|
||||||
assert(*qhandle == (void *)killQueryMsg->qhandle);
|
assert(*qhandle == (void *)killQueryMsg->qhandle);
|
||||||
|
|
||||||
|
@ -174,8 +175,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (handle != NULL &&
|
||||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
|
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
return pRsp->code;
|
return pRsp->code;
|
||||||
|
@ -233,7 +236,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
pRetrieve->free = htons(pRetrieve->free);
|
pRetrieve->free = htons(pRetrieve->free);
|
||||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
|
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
|
||||||
|
pRetrieve->free, pReadMsg->rpcMsg.handle);
|
||||||
|
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
|
@ -259,7 +263,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// register the qhandle to connect to quit query immediate if connection is broken
|
// register the qhandle to connect to quit query immediate if connection is broken
|
||||||
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
|
|
|
@ -61,17 +61,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
|
|
||||||
if (pHead->version == 0) { // from client or CQ
|
if (pHead->version == 0) { // from client or CQ
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
|
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
|
||||||
|
pVnode->status);
|
||||||
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
|
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
|
||||||
|
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,4 +205,3 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,8 +120,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
|
|
||||||
if (pCfg->keep == 1) return pWal;
|
if (pCfg->keep == 1) return pWal;
|
||||||
|
|
||||||
if (walHandleExistingFiles(path) == 0)
|
if (walHandleExistingFiles(path) == 0) walRenew(pWal);
|
||||||
walRenew(pWal);
|
|
||||||
|
|
||||||
if (pWal && pWal->fd < 0) {
|
if (pWal && pWal->fd < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -241,7 +240,10 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
if (pHead->version <= pWal->version) return 0;
|
if (pHead->version <= pWal->version) {
|
||||||
|
wError("wal:%s, failed to write ver:%" PRIu64 ", last ver:%" PRIu64, pWal->name, pHead->version, pWal->version);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
pHead->signature = walSignature;
|
pHead->signature = walSignature;
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
||||||
|
@ -258,7 +260,6 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void walFsync(void *handle) {
|
void walFsync(void *handle) {
|
||||||
|
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
||||||
|
|
||||||
|
@ -280,8 +281,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
char opath[TSDB_FILENAME_LEN + 5];
|
char opath[TSDB_FILENAME_LEN + 5];
|
||||||
|
|
||||||
int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
|
int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
|
||||||
if ( pWal->keep == 0)
|
if (pWal->keep == 0) strcpy(opath + slen, "/old");
|
||||||
strcpy(opath+slen, "/old");
|
|
||||||
|
|
||||||
DIR *dir = opendir(opath);
|
DIR *dir = opendir(opath);
|
||||||
if (dir == NULL && errno == ENOENT) return 0;
|
if (dir == NULL && errno == ENOENT) return 0;
|
||||||
|
@ -370,7 +370,6 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void walRelease(SWal *pWal) {
|
static void walRelease(SWal *pWal) {
|
||||||
|
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
pWal->signature = NULL;
|
pWal->signature = NULL;
|
||||||
free(pWal);
|
free(pWal);
|
||||||
|
|
Loading…
Reference in New Issue