Merge pull request #4390 from taosdata/feature/wal

TD-2283
This commit is contained in:
Shengliang Guan 2020-11-30 18:37:00 +08:00 committed by GitHub
commit 71bed5816a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 6 additions and 8 deletions

View File

@ -40,15 +40,14 @@
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
void *ahandle;
int32_t num; // number of continuous streams
struct SCqObj *pHead; struct SCqObj *pHead;
void *dbConn; void *dbConn;
int32_t master;
void *tmrCtrl; void *tmrCtrl;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SCqContext; } SCqContext;
@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
tstrncpy(pContext->db, db, sizeof(pContext->db)); tstrncpy(pContext->db, db, sizeof(pContext->db));
pContext->vgId = pCfg->vgId; pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite; pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle;
tscEmbedded = 1; tscEmbedded = 1;
pthread_mutex_init(&pContext->mutex, NULL); pthread_mutex_init(&pContext->mutex, NULL);
@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0; pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer); free(buffer);
} }

View File

@ -24,7 +24,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pCq = NULL; void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) {
return 0; return 0;
} }

View File

@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;

View File

@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToWQueue; cqCfg.cqWrite = vnodeWriteToCache;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);