Merge pull request #5667 from taosdata/hotfix/TD-3656
[TD-3656]duplicated CQ for vnode with multiple replica
This commit is contained in:
commit
edeaa56964
|
@ -294,7 +294,7 @@ void cqStop(void *handle) {
|
|||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
|
||||
if (tsEnableStream == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -326,7 +326,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
|
|||
|
||||
pObj->rid = taosAddRef(cqObjRef, pObj);
|
||||
|
||||
cqCreateStream(pContext, pObj);
|
||||
if(start && pContext->master) {
|
||||
cqCreateStream(pContext, pObj);
|
||||
} else {
|
||||
pObj->pContext = pContext;
|
||||
}
|
||||
|
||||
rid = pObj->rid;
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
|
|||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
|
||||
for (int sid =1; sid<10; ++sid) {
|
||||
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
|
||||
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1);
|
||||
}
|
||||
|
||||
tdFreeSchema(pSchema);
|
||||
|
|
|
@ -42,7 +42,7 @@ void cqStart(void *handle);
|
|||
void cqStop(void *handle);
|
||||
|
||||
// cqCreate is called by TSDB to start an instance of CQ
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start);
|
||||
|
||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||
void cqDrop(void *handle);
|
||||
|
|
|
@ -51,7 +51,7 @@ typedef struct {
|
|||
void *cqH;
|
||||
int (*notifyStatus)(void *, int status, int eno);
|
||||
int (*eventCallBack)(void *);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
|
||||
void (*cqDropFunc)(void *handle);
|
||||
} STsdbAppH;
|
||||
|
||||
|
|
|
@ -526,7 +526,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
|
|||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -619,4 +619,4 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
|
||||
tsdbDestroyReadH(&readh);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -840,7 +840,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1), 1);
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
|
@ -1322,4 +1322,4 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) {
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue