Merge pull request #12230 from taosdata/feature/tq

fix(query): column match
This commit is contained in:
Liu Jicong 2022-05-08 00:35:37 +08:00 committed by GitHub
commit 3c64f0f746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 226 additions and 138 deletions

View File

@ -101,8 +101,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
} }
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
uint32_t numOfRow2); const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid); tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);

View File

@ -69,20 +69,24 @@ typedef struct {
SUseDbRsp dbInfo; SUseDbRsp dbInfo;
} STaskDispatcherShuffle; } STaskDispatcherShuffle;
typedef void FTbSink(void* vnode, int64_t ver, const SArray* data);
typedef struct { typedef struct {
int8_t reserved; int64_t stbUid;
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder // not applicable to encoder and decoder
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema; STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid SHashObj* pHash; // groupId to tbuid
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct { typedef struct {
int64_t smaId; int64_t smaId;
// following are not applicable to encoder and decoder // following are not applicable to encoder and decoder
FSmaHandle* smaHandle; FSmaSink* smaSink;
} STaskSinkSma; } STaskSinkSma;
typedef struct { typedef struct {

View File

@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i; int32_t mapIndex = i;
// if (pIndexMap) { // if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); // mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// } // }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
@ -1596,7 +1596,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
// cal size // cal size
@ -1608,7 +1608,29 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
// TODO min // TODO min
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen; int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
} }
// assign data // assign data
@ -1623,11 +1645,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitBlk* blkHead = submitBlk; SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows); blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->schemaLen = 0;
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
// TODO // TODO
blkHead->suid = 0; blkHead->suid = htobe64(suid);
blkHead->uid = htobe64(pDataBlock->info.uid); // uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
@ -1635,7 +1657,35 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
blkHead->dataLen = 0; blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
STSRow* rowData = blockData;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
tEncoderClear(&encoder);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0}; SRowBuilder rb = {0};
@ -1656,6 +1706,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
int32_t len = blkHead->dataLen; int32_t len = blkHead->dataLen;
blkHead->dataLen = htonl(len); blkHead->dataLen = htonl(len);
blkHead = POINTER_SHIFT(blkHead, len); blkHead = POINTER_SHIFT(blkHead, len);
/*submitBlk = blkHead;*/
} }
return ret; return ret;

View File

@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(0); ASSERT(0);
} }
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0); ASSERT(pIter->len > 0);
} }

View File

@ -574,6 +574,7 @@ typedef struct {
char sourceDb[TSDB_DB_FNAME_LEN]; char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;

View File

@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
/*int32_t outputNameSz = 0;*/ /*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;

View File

@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper);
} }
@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
} }
//
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
} }
#endif #endif

View File

@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER; goto _OVER;
} }
stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
return 0; return 0;
@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.createTime = taosGetTimestampMs(); streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime; streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;

View File

@ -884,9 +884,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
} }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { void tqTableSink(void* vnode, int64_t ver, const SArray* data) {
if (pTask->execType == TASK_EXEC__NONE) return 0; //
SVnode* pVnode = (SVnode*)vnode;
// build write msg
//
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
pTask->exec.numOfRunners = parallel; pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) { if (pTask->exec.runners == NULL) {
@ -902,6 +909,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor); ASSERT(pTask->exec.runners[i].executor);
} }
}
if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
}
return 0; return 0;
} }
@ -925,7 +939,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
// sink // sink
pTask->ahandle = pTq->pVnode; pTask->ahandle = pTq->pVnode;
if (pTask->sinkType == TASK_SINK__SMA) { if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle = smaHandleRes; pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) { } else if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);

View File

@ -38,7 +38,8 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false; return false;
} }
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); "-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
@ -407,7 +410,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
@ -517,11 +520,11 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
TSKEY* ts = (TSKEY*)pColDataInfo->pData; TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts+i); taosArrayPush(pInfo->tsArray, ts + i);
} }
} }
if (taosArrayGetSize(pInfo->tsArray) > 0) { if (taosArrayGetSize(pInfo->tsArray) > 0) {
//TODO(liuyao) get from tsdb // TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT; // p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray); // taosArrayClear(pInfo->tsArray);
@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t* id = taosArrayGet(pColList, i); SColMatchInfo* id = taosArrayGet(pColList, i);
taosArrayPush(pColIds, id); int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
} }
pInfo->pColMatchInfo = pColList; pInfo->pColMatchInfo = pColList;
@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
return NULL; return NULL;
} }
pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
if (pInfo->pUpdateInfo == NULL) { if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
@ -701,11 +705,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator; return pOperator;
_error: _error:
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
if (NULL == pCondition) { if (NULL == pCondition) {
return; return;
} }
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName); nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
} }
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
@ -854,12 +859,12 @@ static SSDataBlock* buildSysTableMetaBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0; size_t size = 0;
const SSysTableMeta *pMeta = NULL; const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size); getInfosDbMeta(&pMeta, &size);
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
index = i; index = i;
break; break;
} }
@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < pMeta[index].colNum; ++i) { for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = {0}; SColumnInfoData colInfoData = {0};
colInfoData.info.colId = i + 1; colInfoData.info.colId = i + 1;
colInfoData.info.type = pMeta[index].schema[i].type; colInfoData.info.type = pMeta[index].schema[i].type;
@ -1101,17 +1106,18 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
// blockDataDestroy(p); todo handle memory leak // blockDataDestroy(p); todo handle memory leak
pInfo->pRes->info.rows = p->info.rows; pInfo->pRes->info.rows = p->info.rows;
return p->info.rows; return p->info.rows;
} }
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) { int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName) {
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t numOfRows = p->info.rows; int32_t numOfRows = p->info.rows;
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
const SSysTableMeta* pm = &pSysDbTableMeta[i]; const SSysTableMeta* pm = &pSysDbTableMeta[i];
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false); colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);
for(int32_t j = 4; j <= 8; ++j) { for (int32_t j = 4; j <= 8; ++j) {
pColInfoData = taosArrayGet(p->pDataBlock, j); pColInfoData = taosArrayGet(p->pDataBlock, j);
colDataAppendNULL(pColInfoData, numOfRows); colDataAppendNULL(pColInfoData, numOfRows);
} }
@ -1171,7 +1177,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
tNameAssign(&pInfo->name, pName); tNameAssign(&pInfo->name, pName);
const char* name = tNameGetTableName(&pInfo->name); const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*) readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else { } else {
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
@ -1207,8 +1213,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, pOperator->fpSet =
NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator; return pOperator;
@ -1355,7 +1361,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {

View File

@ -154,10 +154,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/ /*blockDebugShowData(pRes);*/
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid);
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
// //
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
// //
@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
} }
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
} }
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;