TD-449: fix several bugs
This commit is contained in:
parent
fe300f3c64
commit
d34cabf2c1
|
@ -38,6 +38,7 @@ typedef struct {
|
|||
int vgId;
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
void *ahandle;
|
||||
int num; // number of continuous streams
|
||||
|
@ -73,6 +74,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
|||
|
||||
strcpy(pContext->user, pCfg->user);
|
||||
strcpy(pContext->pass, pCfg->pass);
|
||||
strcpy(pContext->db, pCfg->db);
|
||||
pContext->vgId = pCfg->vgId;
|
||||
pContext->cqWrite = pCfg->cqWrite;
|
||||
pContext->ahandle = ahandle;
|
||||
|
@ -207,9 +209,8 @@ void cqDrop(void *handle) {
|
|||
}
|
||||
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||
|
||||
if (pContext->dbConn == NULL) {
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
|
||||
if (pContext->dbConn == NULL) {
|
||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||
}
|
||||
|
@ -217,6 +218,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|||
}
|
||||
|
||||
int64_t lastKey = 0;
|
||||
pObj->pContext = pContext;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
|
|
|
@ -22,7 +22,8 @@ extern "C" {
|
|||
|
||||
int32_t dnodeInitModules();
|
||||
void dnodeStartModules();
|
||||
void dnodeCleanupModules();
|
||||
void dnodeStartStream();
|
||||
void dnodeCleanUpModules();
|
||||
void dnodeProcessModuleStatus(uint32_t moduleStatus);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -124,6 +124,7 @@ int32_t dnodeInitSystem() {
|
|||
|
||||
dnodeStartModules();
|
||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
|
||||
dnodeStartStream();
|
||||
|
||||
dPrint("TDengine is initialized successfully");
|
||||
|
||||
|
|
|
@ -260,11 +260,27 @@ static int32_t dnodeOpenVnodes() {
|
|||
}
|
||||
|
||||
free(vnodeList);
|
||||
|
||||
dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void dnodeStartStream() {
|
||||
int32_t vnodeList[TSDB_MAX_VNODES];
|
||||
int32_t numOfVnodes = 0;
|
||||
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
|
||||
|
||||
if (status != TSDB_CODE_SUCCESS) {
|
||||
dPrint("Get dnode list failed");
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
vnodeStartStream(vnodeList[i]);
|
||||
}
|
||||
|
||||
dPrint("streams started");
|
||||
}
|
||||
|
||||
static void dnodeCloseVnodes() {
|
||||
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
|
||||
int32_t numOfVnodes;
|
||||
|
|
|
@ -27,6 +27,7 @@ typedef struct {
|
|||
int vgId;
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
} SCqCfg;
|
||||
|
||||
|
|
|
@ -118,6 +118,7 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
|||
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg);
|
||||
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
|
||||
void tsdbStartStream(TsdbRepoT *repo);
|
||||
|
||||
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ typedef struct {
|
|||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||
int32_t vnodeStartStream(int32_t vgId);
|
||||
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
|
|
|
@ -474,6 +474,18 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
|
|||
return TSDB_GET_TABLE_LAST_KEY(pTable);
|
||||
}
|
||||
|
||||
void tsdbStartStream(TsdbRepoT *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
|
||||
// TODO
|
||||
return NULL;
|
||||
|
|
|
@ -150,7 +150,6 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
|||
|
||||
void tsdbOrgMeta(void *pHandle) {
|
||||
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
|
||||
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
|
||||
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
|
@ -158,13 +157,6 @@ void tsdbOrgMeta(void *pHandle) {
|
|||
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -208,8 +208,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
}
|
||||
|
||||
SCqCfg cqCfg = {0};
|
||||
sprintf(cqCfg.user, "root");
|
||||
sprintf(cqCfg.user, "_root");
|
||||
strcpy(cqCfg.pass, tsInternalPass);
|
||||
strcpy(cqCfg.db, "s1_db0"); // TODO: replace hard coded db name
|
||||
cqCfg.vgId = vnode;
|
||||
cqCfg.cqWrite = vnodeWriteToQueue;
|
||||
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||
|
@ -277,6 +278,15 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeStartStream(int32_t vnode) {
|
||||
SVnodeObj* pVnode = vnodeAccquireVnode(vnode);
|
||||
if (pVnode != NULL) {
|
||||
tsdbStartStream(pVnode->tsdb);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeClose(int32_t vgId) {
|
||||
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
|
||||
if (ppVnode == NULL || *ppVnode == NULL) return 0;
|
||||
|
|
Loading…
Reference in New Issue