diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9e1628bb9b..3c7cd3e1eb 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -19,6 +19,7 @@ #include "tscLog.h" #include "tscUtil.h" #include "tsched.h" +#include "tcache.h" #include "tsclient.h" #include "ttime.h" #include "ttimer.h" @@ -147,7 +148,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf retryDelay); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); - tscClearTableMetaInfo(pTableMetaInfo, true); + taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true); + tfree(pTableMetaInfo->vgroupList); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; @@ -259,7 +261,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf pStream->numOfRes); // release the metric/meter meta information reference, so data in cache can be updated - tscClearTableMetaInfo(pTableMetaInfo, false); + + taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); + tfree(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 3a84ce8f6f..e706f9a9d5 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -70,6 +70,7 @@ typedef struct { int numOfCols; // Number of columns appended int tlen; // maximum length of a SDataRow without the header part int flen; // First part length in a SDataRow after the header part + int32_t version; STColumn columns[]; } STSchema; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 799a0b0514..d96e0fe6a7 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -49,7 +49,8 @@ typedef struct { } SCqContext; typedef struct SCqObj { - int tid; // table ID + uint64_t uid; + int32_t tid; // table ID int rowSize; // bytes of a row char * sqlStr; // SQL string STSchema * pSchema; // pointer to schema array @@ -155,17 +156,19 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { +void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); if (pObj == NULL) return NULL; + pObj->uid = uid; pObj->tid = tid; pObj->sqlStr = malloc(strlen(sqlStr)+1); strcpy(pObj->sqlStr, sqlStr); pObj->pSchema = tdDupSchema(pSchema); + pObj->rowSize = pSchema->tlen; cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); @@ -213,8 +216,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); if (pContext->dbConn == NULL) { cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + return; } - return; } int64_t lastKey = 0; @@ -231,6 +234,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqObj *pObj = (SCqObj *)param; SCqContext *pContext = pObj->pContext; + STSchema *pSchema = pObj->pSchema; if (pObj->pStream == NULL) return; cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); @@ -240,18 +244,38 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; + SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); + SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); + + int32_t flen = 0; + for (int32_t i = 0; i < pSchema->numOfCols; i++) { + flen += TYPE_BYTES[pSchema->columns[i].type]; + } + + SDataRow trow = (SDataRow)pBlk->data; + dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); + + int toffset = 0; + for (int32_t i = 0; i < pSchema->numOfCols; i++) { + tdAppendColVal(trow, row[i], pSchema->columns[i].type, pSchema->columns[i].bytes, toffset); + toffset += TYPE_BYTES[pSchema->columns[i].type]; + } + pBlk->len = htonl(dataRowLen(trow)); + + pBlk->uid = htobe64(pObj->uid); + pBlk->tid = htonl(pObj->tid); + pBlk->numOfRows = htons(1); + pBlk->sversion = htonl(pSchema->version); + pBlk->padding = 0; + + pMsg->header.vgId = htonl(pContext->vgId); + pMsg->header.contLen = htonl(size - sizeof(SWalHead)); + pMsg->length = pMsg->header.contLen; + pMsg->numOfBlocks = htonl(1); + pHead->msgType = TSDB_MSG_TYPE_SUBMIT; pHead->len = size - sizeof(SWalHead); - - SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead)); - // to do: fill in the SSubmitMsg structure - pSubmit->numOfBlocks = 1; - - - SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); - // to do: fill in the SSubmitBlk strucuture - pBlk->tid = pObj->tid; - + pHead->version = 0; // write into vnode write queue pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index fbe3c95b86..1416a591be 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { tdDestroyTSchemaBuilder(&schemaBuilder); for (int sid =1; sid<10; ++sid) { - cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); + cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); } tdFreeSchema(pSchema); diff --git a/src/inc/tcq.h b/src/inc/tcq.h index ba198ab66e..32b75674c3 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -42,7 +42,7 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); +void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d3ec96acf6..a678f213bb 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -43,7 +43,7 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status); int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); + void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); void (*cqDropFunc)(void *handle); void *(*configFunc)(int32_t vgId, int32_t sid); } STsdbAppH; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 632c1dd34e..f8fea779f3 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -481,7 +481,7 @@ void tsdbStartStream(TsdbRepoT *repo) { for (int i = 0; i < pRepo->config.maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); } } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index bf5fac4e45..ac0f6b26ac 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -675,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { tsdbAddTableIntoIndex(pMeta, pTable); } if (pTable->type == TSDB_STREAM_TABLE && addIdx) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); } pMeta->nTables++; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 037dadb8fd..26ba392a76 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "_root"); strcpy(cqCfg.pass, tsInternalPass); - strcpy(cqCfg.db, "s1_db0"); // TODO: replace hard coded db name + strcpy(cqCfg.db, "db"); // TODO: replace hard coded db name cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg);