other: merge with 3.0.
This commit is contained in:
commit
f6be0f0dfe
|
@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
|
|||
}
|
||||
|
||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
|
||||
uint32_t numOfRow2);
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
||||
|
||||
|
@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks);
|
|||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||
|
|
|
@ -110,9 +110,9 @@ typedef struct SFileBlockInfo {
|
|||
#define FUNCTION_COV 38
|
||||
|
||||
typedef struct SResultRowEntryInfo {
|
||||
bool initialized:1; // output buffer has been initialized
|
||||
bool complete:1; // query has completed
|
||||
uint8_t isNullRes:6; // the result is null
|
||||
bool initialized/*:1*/; // output buffer has been initialized
|
||||
bool complete/*:1*/; // query has completed
|
||||
uint8_t isNullRes/*:6*/; // the result is null
|
||||
uint8_t numOfRes; // num of output result in current buffer
|
||||
} SResultRowEntryInfo;
|
||||
|
||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
|||
#ifndef _TSTREAM_H_
|
||||
#define _TSTREAM_H_
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
|
||||
enum {
|
||||
STREAM_TASK_STATUS__RUNNING = 1,
|
||||
STREAM_TASK_STATUS__STOP,
|
||||
|
@ -69,20 +71,24 @@ typedef struct {
|
|||
SUseDbRsp dbInfo;
|
||||
} STaskDispatcherShuffle;
|
||||
|
||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
int64_t stbUid;
|
||||
SSchemaWrapper* pSchemaWrapper;
|
||||
// not applicable to encoder and decoder
|
||||
void* vnode;
|
||||
FTbSink* tbSinkFunc;
|
||||
STSchema* pTSchema;
|
||||
SHashObj* pHash; // groupId to tbuid
|
||||
} STaskSinkTb;
|
||||
|
||||
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
|
||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||
|
||||
typedef struct {
|
||||
int64_t smaId;
|
||||
// following are not applicable to encoder and decoder
|
||||
FSmaHandle* smaHandle;
|
||||
FSmaSink* smaSink;
|
||||
} STaskSinkSma;
|
||||
|
||||
typedef struct {
|
||||
|
@ -115,7 +121,7 @@ enum {
|
|||
TASK_SINK__FETCH,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
struct SStreamTask {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t status;
|
||||
|
@ -150,8 +156,7 @@ typedef struct {
|
|||
|
||||
// application storage
|
||||
void* ahandle;
|
||||
|
||||
} SStreamTask;
|
||||
};
|
||||
|
||||
SStreamTask* tNewSStreamTask(int64_t streamId);
|
||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
|
|
|
@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
|
|||
|
||||
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
|
||||
int32_t mapIndex = i;
|
||||
// if (pIndexMap) {
|
||||
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
||||
// }
|
||||
// if (pIndexMap) {
|
||||
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
||||
// }
|
||||
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
||||
|
@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
|
||||
// cal size
|
||||
|
@ -1608,13 +1609,37 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
|||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||
int32_t schemaLen = 0;
|
||||
|
||||
if (createTb) {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
createTbReq.name = "a";
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = htobe64(suid);
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
|
||||
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) return NULL;
|
||||
}
|
||||
|
||||
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
|
||||
}
|
||||
|
||||
// assign data
|
||||
ret = taosMemoryCalloc(1, cap);
|
||||
ret = taosMemoryCalloc(1, cap + 46);
|
||||
ret = POINTER_SHIFT(ret, 46);
|
||||
ret->header.vgId = vgId;
|
||||
ret->version = htonl(1);
|
||||
ret->length = htonl(cap - sizeof(SSubmitReq));
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
|
@ -1623,11 +1648,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
|||
|
||||
SSubmitBlk* blkHead = submitBlk;
|
||||
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||
blkHead->schemaLen = 0;
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
// TODO
|
||||
blkHead->suid = 0;
|
||||
blkHead->uid = htobe64(pDataBlock->info.uid);
|
||||
blkHead->suid = htobe64(suid);
|
||||
// uid is assigned by vnode
|
||||
blkHead->uid = 0;
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
|
||||
|
@ -1635,7 +1660,35 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
|||
blkHead->dataLen = 0;
|
||||
|
||||
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
|
||||
STSRow* rowData = blockData;
|
||||
|
||||
int32_t schemaLen = 0;
|
||||
if (createTb) {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
createTbReq.name = "a";
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
|
||||
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) return NULL;
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, blockData, schemaLen);
|
||||
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
|
||||
tEncoderClear(&encoder);
|
||||
}
|
||||
blkHead->schemaLen = htonl(schemaLen);
|
||||
|
||||
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
|
@ -1653,10 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
|||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
blkHead->dataLen += rowLen;
|
||||
}
|
||||
int32_t len = blkHead->dataLen;
|
||||
blkHead->dataLen = htonl(len);
|
||||
blkHead = POINTER_SHIFT(blkHead, len);
|
||||
int32_t dataLen = blkHead->dataLen;
|
||||
blkHead->dataLen = htonl(dataLen);
|
||||
|
||||
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
|
||||
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
|
||||
/*submitBlk = blkHead;*/
|
||||
}
|
||||
|
||||
ret->length = htonl(ret->length);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
|
||||
ASSERT(pIter->len > 0);
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
|||
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
|
||||
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
|
||||
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
|
||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
|
|
|
@ -51,6 +51,7 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
|
|||
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs;
|
||||
pMgmt->state = pInfo->vstat;
|
||||
|
||||
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
||||
taosArrayDestroy(vloads.pVloads);
|
||||
}
|
||||
|
||||
|
@ -177,6 +178,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||
msgCb.pWrapper = pMgmt->pWrapper;
|
||||
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
|
||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
|
|
|
@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
|
||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||
msgCb.pWrapper = pMgmt->pWrapper;
|
||||
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
|
||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
|
@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
pThread->failed++;
|
||||
} else {
|
||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||
//vnodeStart(pImpl);
|
||||
// vnodeStart(pImpl);
|
||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->opened++;
|
||||
}
|
||||
|
|
|
@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
|
|||
pMsg->rpcMsg = *pRpc;
|
||||
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
|
||||
switch (qtype) {
|
||||
case WRITE_QUEUE:
|
||||
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
|
||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||
break;
|
||||
case QUERY_QUEUE:
|
||||
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
|
@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
|
||||
}
|
||||
|
|
|
@ -574,6 +574,7 @@ typedef struct {
|
|||
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int64_t targetStbUid;
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
|
|
|
@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
/*int32_t outputNameSz = 0;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
|
@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
|
|
|
@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
}
|
||||
|
@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
//
|
||||
|
||||
// dispatch
|
||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||
|
||||
|
@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
stbObj.uid = pStream->targetStbUid;
|
||||
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||
|
||||
return 0;
|
||||
|
@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
|||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
streamObj.dbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
|
|
|
@ -967,7 +967,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
|
|||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
pAction->errCode = 0;
|
||||
mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
|
||||
mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1043,7 +1043,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|||
return errCode;
|
||||
}
|
||||
} else {
|
||||
mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions);
|
||||
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,16 +141,19 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// preprocess req
|
||||
pReq->uid = tGenIdPI64();
|
||||
pReq->ctime = taosGetTimestampMs();
|
||||
|
||||
// validate req
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
||||
pReq->uid = mr.me.uid;
|
||||
if (pReq->type == TSDB_CHILD_TABLE) {
|
||||
pReq->ctb.suid = mr.me.ctbEntry.suid;
|
||||
}
|
||||
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
} else {
|
||||
pReq->uid = tGenIdPI64();
|
||||
pReq->ctime = taosGetTimestampMs();
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
|
||||
|
|
|
@ -884,9 +884,26 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||
if (pTask->execType == TASK_EXEC__NONE) return 0;
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
|
||||
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
|
||||
// build write msg
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
.pCont = pReq,
|
||||
.contLen = ntohl(pReq->length),
|
||||
};
|
||||
|
||||
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
// expand runners
|
||||
pTask->exec.numOfRunners = parallel;
|
||||
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
|
||||
if (pTask->exec.runners == NULL) {
|
||||
|
@ -902,6 +919,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
|||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.runners[i].executor);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqTableSink;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -925,7 +949,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
// sink
|
||||
pTask->ahandle = pTq->pVnode;
|
||||
if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaHandle = smaHandleRes;
|
||||
pTask->smaSink.smaSink = smaHandleRes;
|
||||
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||
|
|
|
@ -76,6 +76,8 @@ struct SMemSkipListCurosr {
|
|||
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
|
||||
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
|
||||
|
||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
||||
|
||||
// SMemTable
|
||||
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
|
||||
SMemTable *pMemTb = NULL;
|
||||
|
@ -176,20 +178,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
|
|||
|
||||
// do insert data to SMemData
|
||||
SMemSkipListCurosr slc = {0};
|
||||
const uint8_t *p = pSubmitBlk->pData;
|
||||
const uint8_t *pt;
|
||||
const STSRow *pRow;
|
||||
uint64_t szRow;
|
||||
uint32_t szRow;
|
||||
SDecoder decoder = {0};
|
||||
|
||||
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
|
||||
tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData);
|
||||
for (;;) {
|
||||
// if (tDecodeIsEnd(&coder)) break;
|
||||
if (tDecodeIsEnd(&decoder)) break;
|
||||
|
||||
if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
|
||||
// terrno = TSDB_CODE_INVALID_MSG;
|
||||
// return -1;
|
||||
// }
|
||||
// check the row (todo)
|
||||
|
||||
// // move the cursor to position to write (todo)
|
||||
|
@ -197,11 +198,16 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
|
|||
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
|
||||
// ASSERT(c);
|
||||
|
||||
// // encode row
|
||||
// int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
||||
// int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
|
||||
// pSlNode = vnodeBufPoolMalloc(pPool, tsize);
|
||||
// pSlNode->level = level;
|
||||
// encode row
|
||||
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
||||
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/);
|
||||
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize);
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pNode->level = level;
|
||||
|
||||
// uint8_t *pData = SL_NODE_DATA(pSlNode);
|
||||
// *(int64_t *)pData = version;
|
||||
|
@ -215,7 +221,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
|
|||
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
|
||||
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
|
||||
}
|
||||
// tCoderClear(&coder);
|
||||
tDecoderClear(&decoder);
|
||||
// tsdbMemSkipListCursorClose(&slc);
|
||||
|
||||
// update status
|
||||
|
@ -229,3 +235,18 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
||||
int8_t level = 1;
|
||||
int8_t tlevel;
|
||||
const uint32_t factor = 4;
|
||||
|
||||
if (pSl->size) {
|
||||
tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
|
||||
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
|
||||
level++;
|
||||
}
|
||||
}
|
||||
|
||||
return level;
|
||||
}
|
|
@ -15,8 +15,9 @@
|
|||
#ifndef TDENGINE_QUERYUTIL_H
|
||||
#define TDENGINE_QUERYUTIL_H
|
||||
|
||||
#include "tcommon.h"
|
||||
#include <libs/function/function.h>
|
||||
#include "tbuffer.h"
|
||||
#include "tcommon.h"
|
||||
#include "tpagedbuf.h"
|
||||
|
||||
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
|
||||
|
@ -56,9 +57,9 @@ typedef struct SResultRow {
|
|||
bool endInterp; // the time window end timestamp has done the interpolation already.
|
||||
bool closed; // this result status: closed or opened
|
||||
uint32_t numOfRows; // number of rows of current time window
|
||||
struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo
|
||||
STimeWindow win;
|
||||
char *key; // start key of current result row
|
||||
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
|
||||
// char *key; // start key of current result row
|
||||
} SResultRow;
|
||||
|
||||
typedef struct SResultRowPosition {
|
||||
|
|
|
@ -157,8 +157,6 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
|
|||
pResultRow->pageId = -1;
|
||||
pResultRow->offset = -1;
|
||||
pResultRow->closed = false;
|
||||
|
||||
taosMemoryFreeClear(pResultRow->key);
|
||||
pResultRow->win = TSWINDOW_INITIALIZER;
|
||||
}
|
||||
|
||||
|
|
|
@ -388,6 +388,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
// allocate a new buffer page
|
||||
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
|
||||
if (pResult == NULL) {
|
||||
ASSERT(pSup->resultRowSize > 0);
|
||||
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
|
||||
initResultRow(pResult);
|
||||
|
||||
|
@ -1147,12 +1148,17 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
|
||||
if (pCtx->sfp.getEnv != NULL) {
|
||||
pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
||||
} else { // to work around the interbuffer size requirement.
|
||||
ASSERT(fmIsUserDefinedFunc(pCtx->functionId));
|
||||
// env.calcMemSize = pFunct->resSchema.bytes;
|
||||
}
|
||||
}
|
||||
pCtx->resDataInfo.interBufSize = env.calcMemSize;
|
||||
// todo remove it later.
|
||||
// ASSERT(pCtx->resDataInfo.interBufSize > 0);
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
|
||||
pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||
// for simple column, the intermediate buffer needs to hold one element.
|
||||
// for simple column, the result buffer needs to hold at least one element.
|
||||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
|
||||
}
|
||||
|
||||
|
@ -1872,7 +1878,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
|
|||
}
|
||||
|
||||
void initResultRow(SResultRow* pResultRow) {
|
||||
pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
|
||||
// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1884,7 +1890,7 @@ void initResultRow(SResultRow* pResultRow) {
|
|||
* offset[0] offset[1] offset[2]
|
||||
*/
|
||||
// TODO refactor: some function move away
|
||||
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) {
|
||||
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) {
|
||||
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
||||
SSDataBlock* pDataBlock = pInfo->pRes;
|
||||
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
|
||||
|
@ -1897,6 +1903,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
|||
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
|
||||
pTaskInfo, false, pSup);
|
||||
|
||||
ASSERT(pDataBlock->info.numOfCols == numOfExprs);
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
|
||||
cleanupResultRowEntry(pEntry);
|
||||
|
@ -3624,7 +3631,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
goto _error;
|
||||
}
|
||||
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo);
|
||||
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4217,12 +4224,22 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
|||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||
|
||||
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
|
||||
pAggSup->pResultRowHashTable == NULL) {
|
||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/");
|
||||
uint32_t defaultPgsz = 4096;
|
||||
while(defaultPgsz < pAggSup->resultRowSize*4) {
|
||||
defaultPgsz <<= 1u;
|
||||
}
|
||||
|
||||
// at least four pages need to be in buffer
|
||||
int32_t defaultBufsz = 4096 * 256;
|
||||
if (defaultBufsz <= defaultPgsz) {
|
||||
defaultBufsz = defaultPgsz * 4;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, "/tmp/");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -4425,7 +4442,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
||||
|
||||
pOperator->name = "ProjectOperator";
|
||||
|
|
|
@ -38,7 +38,8 @@
|
|||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
|
||||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
const char* dbName);
|
||||
|
||||
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
|
@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||
uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
|
||||
|
@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||
|
||||
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
|
||||
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
"-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
|
||||
// do prepare for the next round table scan operation
|
||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
|
@ -407,7 +410,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pInfo->dataReader = pReadHandle;
|
||||
// pInfo->prevGroupId = -1;
|
||||
// pInfo->prevGroupId = -1;
|
||||
|
||||
pOperator->name = "TableSeqScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||
|
@ -517,11 +520,11 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
|
|||
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
|
||||
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
|
||||
taosArrayPush(pInfo->tsArray, ts+i);
|
||||
taosArrayPush(pInfo->tsArray, ts + i);
|
||||
}
|
||||
}
|
||||
if (taosArrayGetSize(pInfo->tsArray) > 0) {
|
||||
//TODO(liuyao) get from tsdb
|
||||
// TODO(liuyao) get from tsdb
|
||||
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
|
||||
// p->info.type = STREAM_INVERT;
|
||||
// taosArrayClear(pInfo->tsArray);
|
||||
|
@ -679,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan
|
||||
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan
|
||||
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
|
||||
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
|
||||
if (pInfo->pUpdateInfo == NULL) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -702,11 +705,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
|||
pOperator->fpSet.closeFn = operatorDummyCloseFn;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
|
@ -775,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
|
|||
if (NULL == pCondition) {
|
||||
return;
|
||||
}
|
||||
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName);
|
||||
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
|
||||
}
|
||||
|
||||
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
|
@ -855,12 +859,12 @@ static SSDataBlock* buildSysTableMetaBlock() {
|
|||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
||||
size_t size = 0;
|
||||
const SSysTableMeta *pMeta = NULL;
|
||||
const SSysTableMeta* pMeta = NULL;
|
||||
getInfosDbMeta(&pMeta, &size);
|
||||
|
||||
int32_t index = 0;
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
|
@ -868,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
|
|||
|
||||
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
|
||||
|
||||
for(int32_t i = 0; i < pMeta[index].colNum; ++i) {
|
||||
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
|
||||
SColumnInfoData colInfoData = {0};
|
||||
colInfoData.info.colId = i + 1;
|
||||
colInfoData.info.type = pMeta[index].schema[i].type;
|
||||
|
@ -1102,17 +1106,18 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
|||
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||
// blockDataDestroy(p); todo handle memory leak
|
||||
// blockDataDestroy(p); todo handle memory leak
|
||||
|
||||
pInfo->pRes->info.rows = p->info.rows;
|
||||
return p->info.rows;
|
||||
}
|
||||
|
||||
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) {
|
||||
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
const char* dbName) {
|
||||
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t numOfRows = p->info.rows;
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
const SSysTableMeta* pm = &pSysDbTableMeta[i];
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
|
@ -1133,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
|
|||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);
|
||||
|
||||
for(int32_t j = 4; j <= 8; ++j) {
|
||||
for (int32_t j = 4; j <= 8; ++j) {
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, j);
|
||||
colDataAppendNULL(pColInfoData, numOfRows);
|
||||
}
|
||||
|
@ -1172,7 +1177,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
|||
tNameAssign(&pInfo->name, pName);
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||
pInfo->readHandle = *(SReadHandle*) readHandle;
|
||||
pInfo->readHandle = *(SReadHandle*)readHandle;
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
} else {
|
||||
tsem_init(&pInfo->ready, 0, 0);
|
||||
|
@ -1208,8 +1213,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = pResBlock->info.numOfCols;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
return pOperator;
|
||||
|
@ -1356,7 +1361,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
|
||||
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SSDataBlock* pResBlock, SArray* pColMatchInfo,
|
||||
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#include "ttime.h"
|
||||
#include "tdatablock.h"
|
||||
#include "executorimpl.h"
|
||||
#include "functionMgt.h"
|
||||
#include "tdatablock.h"
|
||||
#include "ttime.h"
|
||||
|
||||
typedef enum SResultTsInterpType {
|
||||
RESULT_ROW_START_INTERP = 1,
|
||||
|
@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx,
|
||||
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep,
|
||||
int32_t order, bool timeWindowInterpo) {
|
||||
|
@ -981,10 +980,10 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
|
|||
}
|
||||
}
|
||||
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
||||
for ( int i = 0; i < num; i++) {
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (type == STREAM_INVERT) {
|
||||
fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
|
||||
} else if (type == STREAM_NORMAL){
|
||||
} else if (type == STREAM_NORMAL) {
|
||||
fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
|
||||
}
|
||||
}
|
||||
|
@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
|
||||
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||
// TODO: remove for stream
|
||||
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
@ -1118,7 +1118,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -1177,7 +1177,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -1379,7 +1379,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1421,7 +1421,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_SUCCESS;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -1473,7 +1473,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroySWindowOperatorInfo(pInfo, numOfCols);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
//#include "tfill.h"
|
||||
#include "function.h"
|
||||
#include "taggfunction.h"
|
||||
#include "tbuffer.h"
|
||||
|
@ -27,7 +26,6 @@
|
|||
#include "thistogram.h"
|
||||
#include "tpercentile.h"
|
||||
#include "ttszip.h"
|
||||
//#include "queryLog.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tudf.h"
|
||||
|
||||
|
|
|
@ -150,14 +150,14 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
pRes = (SArray*)input;
|
||||
}
|
||||
|
||||
if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0;
|
||||
|
||||
// sink
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
/*blockDebugShowData(pRes);*/
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
|
||||
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
//
|
||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||
//
|
||||
|
@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
|
||||
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||
|
@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
|
||||
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
|
||||
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
|
|
|
@ -59,6 +59,10 @@
|
|||
# ---- table
|
||||
./test.sh -f tsim/table/basic1.sim
|
||||
|
||||
# ---- tstream
|
||||
./test.sh -f tsim/tstream/basic0.sim
|
||||
|
||||
|
||||
# ---- tmq
|
||||
./test.sh -f tsim/tmq/basic1.sim
|
||||
./test.sh -f tsim/tmq/basic2.sim
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database d0 vgroups 1
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use d0
|
||||
|
||||
print =============== create super table, include column type for count/sum/min/max/first
|
||||
sql create table if not exists stb (ts timestamp, k int) tags (a int)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create child table
|
||||
sql create table ct1 using stb tags(1000)
|
||||
sql create table ct2 using stb tags(2000)
|
||||
sql create table ct3 using stb tags(3000)
|
||||
|
||||
sql show tables
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
|
||||
|
||||
sql show stables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== insert data
|
||||
|
||||
sql insert into ct1 values('2022-05-08 03:42:00.000', 234)
|
||||
sleep 100
|
||||
|
||||
#===================================================================
|
||||
print =============== query data from child table
|
||||
|
||||
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 234 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 234 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 234 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#===================================================================
|
||||
print =============== insert data
|
||||
|
||||
sql insert into ct1 values('2022-05-08 03:43:00.000', -111)
|
||||
sleep 100
|
||||
|
||||
#===================================================================
|
||||
print =============== query data from child table
|
||||
|
||||
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != -111 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 234 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 123 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#===================================================================
|
||||
print =============== insert data
|
||||
|
||||
sql insert into ct1 values('2022-05-08 03:53:00.000', 789)
|
||||
sleep 100
|
||||
|
||||
#===================================================================
|
||||
print =============== query data from child table
|
||||
|
||||
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
print $data10 $data11 $data12 $data13
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != -111 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 234 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 123 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 789 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 789 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 789 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue