Merge pull request #3891 from taosdata/bugfix/td-1709
[TD-1709]<fix>: stream crash in cluster
This commit is contained in:
commit
5acc9c2873
|
@ -4,6 +4,7 @@ PROJECT(TDengine)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(inc)
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
|
|
@ -24,8 +24,10 @@
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
#include "tsync.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "dnodeInt.h"
|
#include "dnodeInt.h"
|
||||||
|
#include "syncInt.h"
|
||||||
#include "dnodeVWrite.h"
|
#include "dnodeVWrite.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
|
|
||||||
|
@ -239,6 +241,10 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
pHead->len = pWrite->contLen;
|
pHead->len = pWrite->contLen;
|
||||||
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
|
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
|
||||||
taosMsg[pWrite->rpcMsg.msgType]);
|
taosMsg[pWrite->rpcMsg.msgType]);
|
||||||
|
} else if (type == TAOS_QTYPE_CQ) {
|
||||||
|
pHead = (SWalHead *)((char*)item + sizeof(SSyncHead));
|
||||||
|
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||||
|
pHead->version);
|
||||||
} else {
|
} else {
|
||||||
pHead = (SWalHead *)item;
|
pHead = (SWalHead *)item;
|
||||||
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||||
|
|
|
@ -4,6 +4,7 @@ PROJECT(TDengine)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
|
||||||
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
||||||
INCLUDE_DIRECTORIES(inc)
|
INCLUDE_DIRECTORIES(inc)
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
|
@ -62,6 +62,7 @@ typedef struct {
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
||||||
|
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
|
||||||
void vnodeInitWriteFp(void);
|
void vnodeInitWriteFp(void);
|
||||||
void vnodeInitReadFp(void);
|
void vnodeInitReadFp(void);
|
||||||
|
|
||||||
|
|
|
@ -259,7 +259,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
strcpy(cqCfg.pass, tsInternalPass);
|
strcpy(cqCfg.pass, tsInternalPass);
|
||||||
strcpy(cqCfg.db, pVnode->db);
|
strcpy(cqCfg.db, pVnode->db);
|
||||||
cqCfg.vgId = vnode;
|
cqCfg.vgId = vnode;
|
||||||
cqCfg.cqWrite = vnodeWriteToQueue;
|
cqCfg.cqWrite = vnodeWriteCqMsgToQueue;
|
||||||
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||||
if (pVnode->cq == NULL) {
|
if (pVnode->cq == NULL) {
|
||||||
vnodeCleanUp(pVnode);
|
vnodeCleanUp(pVnode);
|
||||||
|
|
|
@ -22,9 +22,11 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
#include "tsync.h"
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "syncInt.h"
|
||||||
#include "tcq.h"
|
#include "tcq.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
||||||
|
@ -189,6 +191,25 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
|
||||||
|
SVnodeObj *pVnode = param;
|
||||||
|
SWalHead * pHead = data;
|
||||||
|
|
||||||
|
int size = sizeof(SWalHead) + pHead->len;
|
||||||
|
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
|
||||||
|
SWalHead *pWal = (SWalHead *)(pSync + 1);
|
||||||
|
memcpy(pWal, pHead, size);
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
|
||||||
|
|
||||||
|
taosWriteQitem(pVnode->wqueue, type, pSync);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *data, int type) {
|
int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = param;
|
||||||
SWalHead * pHead = data;
|
SWalHead * pHead = data;
|
||||||
|
|
Loading…
Reference in New Issue