feat(stream): support define table name

This commit is contained in:
Liu Jicong 2022-10-09 16:54:27 +08:00
parent 0575cc26b1
commit 2f9aeeb5fc
26 changed files with 357 additions and 289 deletions

View File

@ -157,7 +157,7 @@ typedef struct SDataBlockInfo {
int32_t rowSize; int32_t rowSize;
uint64_t uid; // the uid of table, from which current data block comes uint64_t uid; // the uid of table, from which current data block comes
uint16_t blockId; // block id, generated by physical planner uint16_t blockId; // block id, generated by physical planner
uint64_t groupId; // no need to serialize uint64_t groupId;
int16_t hasVarCol; int16_t hasVarCol;
uint32_t capacity; uint32_t capacity;
// TODO: optimize and remove following // TODO: optimize and remove following
@ -166,6 +166,8 @@ typedef struct SDataBlockInfo {
EStreamType type; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream TSKEY watermark; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {

View File

@ -235,6 +235,7 @@ void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* createSpecialDataBlock(EStreamType type);
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);

View File

@ -676,7 +676,6 @@ typedef struct {
col_id_t colId; col_id_t colId;
int16_t slotId; int16_t slotId;
}; };
bool output; // TODO remove it later
int8_t type; int8_t type;
int32_t bytes; int32_t bytes;
@ -1397,6 +1396,7 @@ typedef struct {
int64_t ekey; int64_t ekey;
int64_t version; // for stream int64_t version; // for stream
TSKEY watermark; // for stream TSKEY watermark; // for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // for stream
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
@ -2025,7 +2025,7 @@ typedef struct SVCreateTbReq {
int8_t type; int8_t type;
union { union {
struct { struct {
char* name; // super table name char* stbName; // super table name
uint8_t tagNum; uint8_t tagNum;
tb_uid_t suid; tb_uid_t suid;
SArray* tagName; SArray* tagName;
@ -2045,7 +2045,7 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->comment); taosMemoryFreeClear(req->comment);
if (req->type == TSDB_CHILD_TABLE) { if (req->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(req->ctb.pTag); taosMemoryFreeClear(req->ctb.pTag);
taosMemoryFreeClear(req->ctb.name); taosMemoryFreeClear(req->ctb.stbName);
taosArrayDestroy(req->ctb.tagName); taosArrayDestroy(req->ctb.tagName);
req->ctb.tagName = NULL; req->ctb.tagName = NULL;
} else if (req->type == TSDB_NORMAL_TABLE) { } else if (req->type == TSDB_NORMAL_TABLE) {

View File

@ -89,13 +89,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
*/ */
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
*
* @param tinfo
*/
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *

View File

@ -227,7 +227,7 @@ _err:
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
STag* pTag = (STag*)pCreateReq->ctb.pTag; STag* pTag = (STag*)pCreateReq->ctb.pTag;
char* sname = pCreateReq->ctb.name; char* sname = pCreateReq->ctb.stbName;
char* name = pCreateReq->name; char* name = pCreateReq->name;
SArray* tagName = pCreateReq->ctb.tagName; SArray* tagName = pCreateReq->ctb.tagName;
int64_t id = pCreateReq->uid; int64_t id = pCreateReq->uid;
@ -355,7 +355,8 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson(req.pReqs, req.nReqs); string = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) { } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); string =
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
} }
} }
@ -828,10 +829,10 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SName sName = {0}; SName sName = {0};
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.name, &sName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.name); uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.stbName);
goto end; goto end;
} }
@ -1765,7 +1766,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
if (strcmp(tbName, pCreateReq.name) == 0) { if (strcmp(tbName, pCreateReq.name) == 0) {
schemaLen = *lenTmp; schemaLen = *lenTmp;
schemaData = *dataTmp; schemaData = *dataTmp;
strcpy(pName.tname, pCreateReq.ctb.name); strcpy(pName.tname, pCreateReq.ctb.stbName);
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment); taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName); taosArrayDestroy(pCreateReq.ctb.tagName);

View File

@ -1346,6 +1346,43 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
return pBlock; return pBlock;
} }
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
if (pDataBlock == NULL) {
return NULL;
}
SSDataBlock* pBlock = createDataBlock();
pBlock->info = pDataBlock->info;
pBlock->info.rows = 0;
pBlock->info.capacity = 0;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
blockDataAppendColInfo(pBlock, &colInfo);
}
int32_t code = blockDataEnsureCapacity(pBlock, 1);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pBlock);
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
void* pData = colDataGetData(pSrc, rowIdx);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
colDataAppend(pDst, 0, pData, isNull);
}
pBlock->info.rows = 1;
return pBlock;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return NULL; return NULL;

View File

@ -5075,7 +5075,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
} }
if (pReq->type == TSDB_CHILD_TABLE) { if (pReq->type == TSDB_CHILD_TABLE) {
if (tEncodeCStr(pCoder, pReq->ctb.name) < 0) return -1; if (tEncodeCStr(pCoder, pReq->ctb.stbName) < 0) return -1;
if (tEncodeU8(pCoder, pReq->ctb.tagNum) < 0) return -1; if (tEncodeU8(pCoder, pReq->ctb.tagNum) < 0) return -1;
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
@ -5112,7 +5112,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
} }
if (pReq->type == TSDB_CHILD_TABLE) { if (pReq->type == TSDB_CHILD_TABLE) {
if (tDecodeCStr(pCoder, &pReq->ctb.name) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->ctb.stbName) < 0) return -1;
if (tDecodeU8(pCoder, &pReq->ctb.tagNum) < 0) return -1; if (tDecodeU8(pCoder, &pReq->ctb.tagNum) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;

View File

@ -639,6 +639,7 @@ typedef struct {
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SSchemaWrapper outputSchema; SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);

View File

@ -343,6 +343,20 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL; goto FAIL;
} }
pObj->tagSchema.nCols = pCreate->numOfTags;
if (pCreate->numOfTags) {
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
}
ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
SField *pField = taosArrayGet(pCreate->pTags, i);
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
pObj->tagSchema.pSchema[i].bytes = pField->bytes;
pObj->tagSchema.pSchema[i].flags = pField->flags;
pObj->tagSchema.pSchema[i].type = pField->type;
memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
}
FAIL: FAIL:
if (pAst != NULL) nodesDestroyNode(pAst); if (pAst != NULL) nodesDestroyNode(pAst);
if (pPlan != NULL) qDestroyQueryPlan(pPlan); if (pPlan != NULL) qDestroyQueryPlan(pPlan);

View File

@ -248,7 +248,8 @@ static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
} }
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
SSnapContext** ctxRet) {
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if (ctx == NULL) return -1; if (ctx == NULL) return -1;
*ctxRet = ctx; *ctxRet = ctx;
@ -288,7 +289,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
continue; continue;
} }
if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) < 0) { // check if table exist for now, need optimize later if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
0) { // check if table exist for now, need optimize later
continue; continue;
} }
@ -338,8 +340,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
} }
} }
if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
saveSuperTableInfoForChildTable(&me, ctx->suidInfo); saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
} }
tDecoderClear(&dc); tDecoderClear(&dc);
@ -350,7 +352,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t)); SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
ASSERT(idData); ASSERT(idData);
idData->index = i; idData->index = i;
metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
idData->index);
} }
tdbFree(pKey); tdbFree(pKey);
@ -481,8 +484,8 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
metaDecodeEntry(&dc, &me); metaDecodeEntry(&dc, &me);
metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1); metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
req.name = me.name; req.name = me.name;
req.suid = me.uid; req.suid = me.uid;
@ -494,9 +497,10 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
ret = buildSuperTableInfo(&req, pBuf, contLen); ret = buildSuperTableInfo(&req, pBuf, contLen);
*type = TDMT_VND_CREATE_STB; *type = TDMT_VND_CREATE_STB;
} else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); STableInfoForChildTable* data =
(STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
ASSERT(data); ASSERT(data);
SVCreateTbReq req = {0}; SVCreateTbReq req = {0};
@ -506,7 +510,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
req.commentLen = -1; req.commentLen = -1;
req.ctb.suid = me.ctbEntry.suid; req.ctb.suid = me.ctbEntry.suid;
req.ctb.tagNum = data->tagRow->nCols; req.ctb.tagNum = data->tagRow->nCols;
req.ctb.name = data->tableName; req.ctb.stbName = data->tableName;
SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN); SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
STag* p = (STag*)me.ctbEntry.pTags; STag* p = (STag*)me.ctbEntry.pTags;
@ -598,7 +602,8 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){
metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1); metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) { if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); STableInfoForChildTable* data =
(STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid; result.uid = me.uid;
result.suid = me.ctbEntry.suid; result.suid = me.ctbEntry.suid;
result.schema = tCloneSSchemaWrapper(data->schemaRow); result.schema = tCloneSSchemaWrapper(data->schemaRow);
@ -613,7 +618,8 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){
tDecoderClear(&dc); tDecoderClear(&dc);
break; break;
} else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) { } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); STableInfoForChildTable* data =
(STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid; result.uid = me.uid;
result.suid = me.ctbEntry.suid; result.suid = me.ctbEntry.suid;
strcpy(result.tbName, me.name); strcpy(result.tbName, me.name);

View File

@ -382,7 +382,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
} }
if (pReq->type == TSDB_CHILD_TABLE) { if (pReq->type == TSDB_CHILD_TABLE) {
tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.name); tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.stbName);
if (suid != pReq->ctb.suid) { if (suid != pReq->ctb.suid) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1; return -1;

View File

@ -327,7 +327,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SReadHandle handle = { SReadHandle handle = {
.meta = pVnode->pMeta, .meta = pVnode->pMeta,
.vnode = pVnode, .vnode = pVnode,
@ -1821,11 +1820,9 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
goto _err; goto _err;
} }
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
tdCleanupStreamInputDataBlock(taskInfo);
goto _err; goto _err;
} }
tdCleanupStreamInputDataBlock(taskInfo);
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished",
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
} else { } else {

View File

@ -48,8 +48,9 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL; SArray* schemaReqSz = NULL;
@ -116,13 +117,19 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// } // }
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.ctb.name = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
createTbReq.flags = 0; createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
if (pDataBlock->info.parTbName[0]) {
createTbReq.name = strdup(pDataBlock->info.parTbName);
} else {
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
}
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag; createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
@ -261,8 +268,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid, SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
pTask->tbSink.stbFullName, &deleteReq); pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

View File

@ -74,6 +74,7 @@ typedef struct SResultRowPosition {
typedef struct SResKeyPos { typedef struct SResKeyPos {
SResultRowPosition pos; SResultRowPosition pos;
uint64_t groupId; uint64_t groupId;
// char parTbName[TSDB_TABLE_NAME_LEN];
char key[]; char key[];
} SResKeyPos; } SResKeyPos;
@ -123,6 +124,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type); int32_t type);
void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);

View File

@ -460,6 +460,8 @@ typedef struct SPartitionBySupporter {
typedef struct SPartitionDataInfo { typedef struct SPartitionDataInfo {
uint64_t groupId; uint64_t groupId;
char* tbname;
SArray* tags;
SArray* rowIds; SArray* rowIds;
} SPartitionDataInfo; } SPartitionDataInfo;
@ -617,6 +619,7 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pChildren; SArray* pChildren;
SStreamState* pState; SStreamState* pState;
SWinKey delKey; SWinKey delKey;
SHashObj* pGroupIdTbNameMap; // uint64_t -> char[TSDB_TABLE_NAME_LEN]
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
@ -771,6 +774,7 @@ typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup; SPartitionBySupporter partitionSup;
SExprSupp scalarSup; SExprSupp scalarSup;
SExprSupp tbnameCalSup;
SHashObj* pPartitions; SHashObj* pPartitions;
void* parIte; void* parIte;
SSDataBlock* pInputDataBlock; SSDataBlock* pInputDataBlock;
@ -1085,7 +1089,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);

View File

@ -1124,45 +1124,45 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
return pCol; return pCol;
} }
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode)); pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
pExp->pExpr->_function.num = 1; pExp->pExpr->_function.num = 1;
pExp->pExpr->_function.functionId = -1; pExp->pExpr->_function.functionId = -1;
int32_t type = nodeType(pTargetNode->pExpr); int32_t type = nodeType(pNode);
// it is a project query, or group by column // it is a project query, or group by column
if (type == QUERY_NODE_COLUMN) { if (type == QUERY_NODE_COLUMN) {
pExp->pExpr->nodeType = QUERY_NODE_COLUMN; pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1; pExp->base.numOfParams = 1;
SDataType* pType = &pColNode->node.resType; SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pExp->base.resSchema =
pType->precision, pColNode->colName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = pExp->base.pParam[0].pCol =
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (type == QUERY_NODE_VALUE) { } else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE; pExp->pExpr->nodeType = QUERY_NODE_VALUE;
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr; SValueNode* pValNode = (SValueNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1; pExp->base.numOfParams = 1;
SDataType* pType = &pValNode->node.resType; SDataType* pType = &pValNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pExp->base.resSchema =
pType->precision, pValNode->node.aliasName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param); nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
} else if (type == QUERY_NODE_FUNCTION) { } else if (type == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
SDataType* pType = &pFuncNode->node.resType; SDataType* pType = &pFuncNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pExp->base.resSchema =
pType->precision, pFuncNode->node.aliasName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.functionId = pFuncNode->funcId;
pExp->pExpr->_function.pFunctNode = pFuncNode; pExp->pExpr->_function.pFunctNode = pFuncNode;
@ -1204,20 +1204,24 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
} }
} else if (type == QUERY_NODE_OPERATOR) { } else if (type == QUERY_NODE_OPERATOR) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr; SOperatorNode* pOpNode = (SOperatorNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1; pExp->base.numOfParams = 1;
SDataType* pType = &pNode->node.resType; SDataType* pType = &pOpNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pExp->base.resSchema =
pType->precision, pNode->node.aliasName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; pExp->pExpr->_optrRoot.pRootNode = pNode;
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
}
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfFuncs = LIST_LENGTH(pNodeList);
int32_t numOfGroupKeys = 0; int32_t numOfGroupKeys = 0;

View File

@ -49,17 +49,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
#if 0
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
pInfo->validBlockIndex = 0;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);
} else {
taosArrayClear(pInfo->pBlockLists);
}
#endif
ASSERT(pInfo->validBlockIndex == 0); ASSERT(pInfo->validBlockIndex == 0);
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
@ -71,30 +60,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
/*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
/*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/
ASSERT(numOfBlocks == 1); ASSERT(numOfBlocks == 1);
/*if (numOfBlocks == 1) {*/
taosArrayPush(pInfo->pBlockLists, &input); taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
/*} else {*/
/*}*/
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock); taosArrayPush(pInfo->pBlockLists, &pDataBlock);
#if 0
// TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info;
taosArrayClear(p->pDataBlock);
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
#endif
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
@ -107,27 +79,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
#if 0
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
return;
}
SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0];
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOptrInfo->info;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory);
} else {
ASSERT(0);
}
} else {
ASSERT(0);
}
#endif
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
@ -330,8 +281,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
} }
bool exists = false;
#if 0 #if 0
bool exists = false;
for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) { for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k); STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
if (pKeyInfo->uid == keyInfo.uid) { if (pKeyInfo->uid == keyInfo.uid) {
@ -339,13 +290,14 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
exists = true; exists = true;
} }
} }
#endif
if (!exists) { if (!exists) {
#endif
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
} }
}
/*}*/
if (keyBuf != NULL) { if (keyBuf != NULL) {
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);

View File

@ -4214,8 +4214,9 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) { SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprInfo* pExprInfo = pSup->pExprInfo; SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs; int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
@ -4244,6 +4245,11 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pStat
if (pBlock->info.groupId == 0) { if (pBlock->info.groupId == 0) {
pBlock->info.groupId = pPos->groupId; pBlock->info.groupId = pPos->groupId;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pPos->groupId) { if (pBlock->info.groupId != pPos->groupId) {

View File

@ -873,6 +873,26 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
} }
pDest->info.rows++; pDest->info.rows++;
if (i == 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1) {
memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData));
} else {
pDest->info.parTbName[0] = 0;
}
}
} }
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
pDest->info.groupId = pParInfo->groupId; pDest->info.groupId = pParInfo->groupId;
@ -895,6 +915,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
} else { } else {
SPartitionDataInfo newParData = {0}; SPartitionDataInfo newParData = {0};
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
/*newParData.tbname = */
newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
taosArrayPush(newParData.rowIds, &i); taosArrayPush(newParData.rowIds, &i);
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo)); taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
@ -1000,6 +1021,20 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
} }
} }
if (pPartNode->pSubtable != NULL) {
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
int32_t keyLen = 0; int32_t keyLen = 0;
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
pInfo->partitionSup.pGroupCols); pInfo->partitionSup.pGroupCols);

View File

@ -924,29 +924,9 @@ _error:
return NULL; return NULL;
} }
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
#if 0
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
for (int32_t i = 0; i < total; i++) {
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
taosArrayDestroy(p->pDataBlock);
taosMemoryFree(p);
}
}
#endif
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
#if 0
size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0;
for (int32_t i = 0; i < total; ++i) {
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
blockDataDestroy(p);
}
taosArrayClear(pInfo->pBlockLists);
#endif
} }
static bool isSessionWindow(SStreamScanInfo* pInfo) { static bool isSessionWindow(SStreamScanInfo* pInfo) {

View File

@ -1536,6 +1536,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
} }
} }
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
/*taosHashRemove(pInfo->pGroupIdTbNameMap, &pWinKey->groupId, sizeof(int64_t));*/
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3045,7 +3046,7 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock*
// clear the existed group id // clear the existed group id
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
buildDataBlockFromGroupRes(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
} }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
@ -3240,6 +3241,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
}
ASSERT(pBlock->info.type != STREAM_INVERT); ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
@ -3477,6 +3483,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0; pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
@ -5659,6 +5668,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, "single interval recv"); printDataBlock(pBlock, "single interval recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
}
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL); doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
@ -5806,6 +5820,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0; pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamIntervalOperator"; pOperator->name = "StreamIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
pOperator->blocking = true; pOperator->blocking = true;

View File

@ -783,7 +783,7 @@ static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTa
pTbReq->name = strdup(tname); pTbReq->name = strdup(tname);
pTbReq->ctb.suid = suid; pTbReq->ctb.suid = suid;
pTbReq->ctb.tagNum = tagNum; pTbReq->ctb.tagNum = tagNum;
if (sname) pTbReq->ctb.name = strdup(sname); if (sname) pTbReq->ctb.stbName = strdup(sname);
pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.pTag = (uint8_t*)pTag;
pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->ctb.tagName = taosArrayDup(tagName);
pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL; pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL;
@ -2469,9 +2469,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
pTableMeta->tableInfo.numOfTags); pTableMeta->tableInfo.numOfTags);
taosArrayDestroy(tagName); taosArrayDestroy(tagName);
smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1);
memcpy(smlHandle->tableExecHandle.createTblReq.ctb.name, sTableName, sTableNameLen); memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen);
smlHandle->tableExecHandle.createTblReq.ctb.name[sTableNameLen] = 0; smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0;
STableDataBlocks* pDataBlock = NULL; STableDataBlocks* pDataBlock = NULL;
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),

View File

@ -5381,10 +5381,10 @@ static EDealRes rewriteSubtable(SNode** pNode, void* pContext) {
found = true; found = true;
break; break;
} }
}
if (!found) { if (!found) {
return generateDealNodeErrMsg(pCxt->pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)*pNode)->colName); return generateDealNodeErrMsg(pCxt->pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)*pNode)->colName);
} }
}
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
} }
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
@ -6454,7 +6454,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
} }
req.ctb.suid = suid; req.ctb.suid = suid;
req.ctb.tagNum = tagNum; req.ctb.tagNum = tagNum;
req.ctb.name = strdup(sTableNmae); req.ctb.stbName = strdup(sTableNmae);
req.ctb.pTag = (uint8_t*)pTag; req.ctb.pTag = (uint8_t*)pTag;
req.ctb.tagName = taosArrayDup(tagName); req.ctb.tagName = taosArrayDup(tagName);
if (pStmt->ignoreExists) { if (pStmt->ignoreExists) {

View File

@ -35,6 +35,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version); pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.watermark = be64toh(pRetrieve->watermark); pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;

View File

@ -195,6 +195,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);
pRetrieve->watermark = htobe64(pBlock->info.watermark); pRetrieve->watermark = htobe64(pBlock->info.watermark);
memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
@ -250,7 +251,13 @@ FAIL:
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) { int64_t groupId) {
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); char* ctbName;
if (pDataBlock->info.parTbName[0]) {
ctbName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
}
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/

View File

@ -60,6 +60,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
if (!specPath) { if (!specPath) {
sprintf(statePath, "%s/%d", path, pTask->taskId); sprintf(statePath, "%s/%d", path, pTask->taskId);
} else { } else {
memset(statePath, 0, 300);
strncpy(statePath, path, 300); strncpy(statePath, path, 300);
} }
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {