Merge pull request #4477 from taosdata/feature/td-2310
[TD-2310]<feature>: add dest table into show streams
This commit is contained in:
commit
625cc5a6e3
|
@ -382,6 +382,7 @@ typedef struct SSqlObj {
|
|||
|
||||
typedef struct SSqlStream {
|
||||
SSqlObj *pSql;
|
||||
const char* dstTable;
|
||||
uint32_t streamId;
|
||||
char listed;
|
||||
bool isProject;
|
||||
|
@ -408,6 +409,8 @@ typedef struct SSqlStream {
|
|||
struct SSqlStream *prev, *next;
|
||||
} SSqlStream;
|
||||
|
||||
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
|
||||
|
||||
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
|
||||
void tscInitMsgsFp();
|
||||
|
||||
|
|
|
@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
|||
SSqlStream *pStream = pObj->streamList;
|
||||
while (pStream) {
|
||||
tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql));
|
||||
if (pStream->dstTable == NULL) {
|
||||
pSdesc->dstTable[0] = 0;
|
||||
} else {
|
||||
tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable));
|
||||
}
|
||||
pSdesc->streamId = htonl(pStream->streamId);
|
||||
pSdesc->num = htobe64(pStream->num);
|
||||
|
||||
|
|
|
@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
|
||||
}
|
||||
|
||||
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
|
||||
pStream->dstTable = dstTable;
|
||||
}
|
||||
|
||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
|
|
|
@ -57,6 +57,7 @@ typedef struct SCqObj {
|
|||
uint64_t uid;
|
||||
int32_t tid; // table ID
|
||||
int32_t rowSize; // bytes of a row
|
||||
char * dstTable;
|
||||
char * sqlStr; // SQL string
|
||||
STSchema * pSchema; // pointer to schema array
|
||||
void * pStream;
|
||||
|
@ -185,7 +186,7 @@ void cqStop(void *handle) {
|
|||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) {
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
|
||||
if (tsEnableStream == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
|
|||
if (pObj == NULL) return NULL;
|
||||
|
||||
pObj->uid = uid;
|
||||
pObj->tid = tid;
|
||||
pObj->sqlStr = malloc(strlen(sqlStr)+1);
|
||||
strcpy(pObj->sqlStr, sqlStr);
|
||||
pObj->tid = sid;
|
||||
if (dstTable != NULL) {
|
||||
pObj->dstTable = strdup(dstTable);
|
||||
}
|
||||
pObj->sqlStr = strdup(sqlStr);
|
||||
|
||||
pObj->pSchema = tdDupSchema(pSchema);
|
||||
pObj->rowSize = schemaTLen(pSchema);
|
||||
|
@ -247,6 +250,7 @@ void cqDrop(void *handle) {
|
|||
|
||||
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
tdFreeSchema(pObj->pSchema);
|
||||
free(pObj->dstTable);
|
||||
free(pObj->sqlStr);
|
||||
free(pObj);
|
||||
|
||||
|
@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|||
if (pObj->pStream == NULL) {
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
|
||||
pContext->num++;
|
||||
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
|
|
|
@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
|
|||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
|
||||
for (int sid =1; sid<10; ++sid) {
|
||||
cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
|
||||
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
|
||||
}
|
||||
|
||||
tdFreeSchema(pSchema);
|
||||
|
|
|
@ -787,6 +787,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char sql[TSDB_SHOW_SQL_LEN];
|
||||
char dstTable[TSDB_TABLE_NAME_LEN];
|
||||
uint32_t streamId;
|
||||
int64_t num; // number of computing/cycles
|
||||
int64_t useconds;
|
||||
|
|
|
@ -42,7 +42,7 @@ void cqStart(void *handle);
|
|||
void cqStop(void *handle);
|
||||
|
||||
// cqCreate is called by TSDB to start an instance of CQ
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema);
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
|
||||
|
||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||
void cqDrop(void *handle);
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct {
|
|||
void *cqH;
|
||||
int (*notifyStatus)(void *, int status, int eno);
|
||||
int (*eventCallBack)(void *);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
|
||||
void (*cqDropFunc)(void *handle);
|
||||
} STsdbAppH;
|
||||
|
||||
|
|
|
@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "dest table");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
|
@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
|
|||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
|
||||
|
|
|
@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
|
|||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
|
||||
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue