Merge remote-tracking branch 'origin/3.0' into fix/tsim

This commit is contained in:
Shengliang Guan 2022-06-25 18:03:23 +08:00
commit 8fe87e9e0e
33 changed files with 834 additions and 308 deletions

View File

@ -42,12 +42,13 @@ enum {
typedef enum EStreamType { typedef enum EStreamType {
STREAM_NORMAL = 1, STREAM_NORMAL = 1,
STREAM_INVERT, STREAM_INVERT,
STREAM_REPROCESS, STREAM_CLEAR,
STREAM_INVALID, STREAM_INVALID,
STREAM_GET_ALL, STREAM_GET_ALL,
STREAM_DELETE, STREAM_DELETE,
STREAM_RETRIEVE, STREAM_RETRIEVE,
STREAM_PUSH_DATA, STREAM_PUSH_DATA,
STREAM_PUSH_EMPTY,
} EStreamType; } EStreamType;
typedef struct { typedef struct {

View File

@ -224,6 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock(); SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
@ -236,6 +237,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t*
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag); void blockDebugShowData(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);

View File

@ -319,7 +319,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
return -1; return -1;
} }
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) { } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);

View File

@ -32,12 +32,15 @@ typedef struct SUpdateInfo {
int64_t interval; int64_t interval;
int64_t watermark; int64_t watermark;
TSKEY minTS; TSKEY minTS;
SScalableBf* pCloseWinSBF;
} SUpdateInfo; } SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
ASSERT(numOfRows > 0); //ASSERT(numOfRows > 0);
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1230,6 +1230,32 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return 0; return 0;
} }
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL && dst != NULL);
blockDataCleanup(dst);
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.window = src->info.window;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return NULL; return NULL;
@ -1627,6 +1653,56 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
} }
} }
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
char pBuf[128] = {0};
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
break;
case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
}
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf;
}
/** /**
* @brief TODO: Assume that the final generated result it less than 3M * @brief TODO: Assume that the final generated result it less than 3M
* *

View File

@ -401,6 +401,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
} }
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) { } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
return -1;
}
topicObj.stbUid = pStb->uid; topicObj.stbUid = pStb->uid;
} }
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/

View File

@ -257,7 +257,7 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
case TDMT_STREAM_TASK_RECOVER_RSP: case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg); return sndProcessTaskRecoverRsp(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg); return sndProcessTaskRetrieveRsp(pSnode, pMsg);
default: default:
ASSERT(0); ASSERT(0);
} }

View File

@ -196,6 +196,7 @@ struct SVnodeCfg {
typedef struct { typedef struct {
TSKEY lastKey; TSKEY lastKey;
uint64_t uid; uint64_t uid;
uint64_t groupId;
} STableKeyInfo; } STableKeyInfo;
struct SMetaEntry { struct SMetaEntry {

View File

@ -109,11 +109,15 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pMsg == NULL) return false;
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) {
pHandle->pMsg = NULL;
return false;
}
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
return true; return true;

View File

@ -18,57 +18,87 @@
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) { const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
// cal size
int32_t cap = sizeof(SSubmitReq);
int32_t sz = taosArrayGetSize(pBlocks); int32_t sz = taosArrayGetSize(pBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) { if (createTb) {
SVCreateTbReq createTbReq = {0}; schemaReqs = taosArrayInit(sz, sizeof(void*));
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
createTbReq.name = cname; for (int32_t i = 0; i < sz; i++) {
createTbReq.flags = 0; SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
createTbReq.type = TSDB_CHILD_TABLE; STagVal tagVal = {
createTbReq.ctb.suid = suid; .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
STagVal tagVal = { .i64 = (int64_t)pDataBlock->info.groupId,
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
}; };
STag* pTag = NULL; STag* pTag = NULL;
taosArrayClear(tagArray); taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) { if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq); taosArrayDestroy(schemaReqs);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return NULL; return NULL;
} }
SVCreateTbReq createTbReq = {0};
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag; createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code; int32_t code;
int32_t schemaLen;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) { if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL; return NULL;
} }
}
void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosArrayPush(schemaReqs, &schemaStr);
taosArrayPush(schemaReqSz, &schemaLen);
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaStr, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
}
}
taosArrayDestroy(tagArray);
// cal size
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) {
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
} }
@ -99,55 +129,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0; int32_t schemaLen = 0;
if (createTb) { if (createTb) {
SVCreateTbReq createTbReq = {0}; schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); void* schemaStr = taosArrayGetP(schemaReqs, i);
createTbReq.name = cname; memcpy(blkSchema, schemaStr, schemaLen);
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, blkSchema, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
} }
blkHead->schemaLen = htonl(schemaLen); blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0}; SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version); tdSRowInit(&rb, pTSchema->version);
@ -175,7 +163,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
ret->length = htonl(ret->length); ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return ret; return ret;
} }

View File

@ -2852,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break; break;
} }
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0};
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }

View File

@ -293,6 +293,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES, STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER, STREAM_SCAN_FROM_DATAREADER,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
} EStreamScanMode; } EStreamScanMode;
typedef struct SCatchSupporter { typedef struct SCatchSupporter {
@ -348,7 +349,9 @@ typedef struct SStreamBlockScanInfo {
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
@ -427,8 +430,13 @@ typedef struct SStreamFinalIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SArray* pChildren; SArray* pChildren;
SSDataBlock* pUpdateRes; SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
} SStreamFinalIntervalOperatorInfo; } SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
@ -851,6 +859,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock();
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -315,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
taosArrayDestroy(res); taosArrayDestroy(res);
@ -336,7 +336,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
} }
}else { // Create one table group. }else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid}; STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);

View File

@ -4028,6 +4028,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t len = (int32_t)(pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len); uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
info->groupId = groupId;
groupNum++; groupNum++;
nodesClearList(groupNew); nodesClearList(groupNew);
@ -4126,7 +4127,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info); taosArrayPush(pTableListInfo->pTableList, &info);
} }

View File

@ -776,15 +776,14 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
} }
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
.ekey = INT64_MAX, .ekey = INT64_MAX,
}; };
bool needRead = false; bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
@ -793,14 +792,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
int64_t gap = pInfo->sessionSup.gap; int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
pInfo->updateResIndex += (*pRowIndex) +=
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval, win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey, (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC); binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
@ -823,6 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
return true; return true;
@ -862,12 +864,12 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
*/ */
} }
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult == NULL) { if (pResult == NULL) {
if (prepareDataScan(pInfo)) { if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pSnapshotReadOp);
} }
@ -916,7 +918,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
pUpdateBlock->info.rows = i; pUpdateBlock->info.rows = i;
pInfo->tsArrayIndex += i; pInfo->tsArrayIndex += i;
pUpdateBlock->info.groupId = pInfo->groupId; pUpdateBlock->info.groupId = pInfo->groupId;
pUpdateBlock->info.type = STREAM_REPROCESS; pUpdateBlock->info.type = STREAM_CLEAR;
blockDataUpdateTsWindow(pUpdateBlock, 0); blockDataUpdateTsWindow(pUpdateBlock, 0);
} }
// all rows have same group id // all rows have same group id
@ -970,6 +972,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
if (pBlock->info.type == STREAM_RETRIEVE) {
pInfo->blockType = STREAM_DATA_TYPE_SUBMIT_BLOCK;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
}
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { } else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
@ -979,28 +989,39 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
if (!isStateWindow(pInfo)) { if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo); prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
} }
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
if (pSDB != NULL) {
getUpdateDataBlock(pInfo, true, pSDB, NULL);
pSDB->info.type = STREAM_PUSH_DATA;
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else { } else {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) { if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo); prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
} }
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo); SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB == NULL) { if (pSDB == NULL) {
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
if (!isStateWindow(pInfo)) { if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo); // Todo(liuyao) mybe can delete this.
bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
ASSERT(test == false);
} }
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} else { } else {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} }
} else { } else {
pSDB->info.type = STREAM_NORMAL;
getUpdateDataBlock(pInfo, true, pSDB, NULL); getUpdateDataBlock(pInfo, true, pSDB, NULL);
return pSDB; return pSDB;
} }
@ -1070,10 +1091,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
taosArrayDestroy(block.pDataBlock); taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) { if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log // TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
@ -1091,12 +1114,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
if (pBlockInfo->rows == 0) { if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) { } else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0; pInfo->tsArrayIndex = 0;
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes); getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) { if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) { } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
@ -1209,6 +1233,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
@ -1965,7 +1990,10 @@ _error:
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo; STableListInfo* tableListInfo;
int32_t currentGroupId; int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT* SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle; SReadHandle readHandle;
@ -2006,7 +2034,7 @@ typedef struct STableMergeScanInfo {
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo; } STableMergeScanInfo;
@ -2030,6 +2058,22 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subTableList);
}
return TSDB_CODE_SUCCESS;
}
// todo refactor // todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
@ -2216,34 +2260,32 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tmp, taosArrayGet(tableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(tmp);
}
return TSDB_CODE_SUCCESS;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId); {
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList, int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
STableListInfo* tableListInfo = pInfo->tableListInfo;
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, pInfo->queryId, pInfo->taskId); pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
int32_t tableLen = taosArrayGetSize(tableList); pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
@ -2330,43 +2372,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (pInfo->currentGroupId == -1) { if (tableListSize == 0) {
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); SSDataBlock* pBlock = NULL;
if (pBlock != NULL) { while (pInfo->tableStartIndex < tableListSize) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t)); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if(groupId) pBlock->info.groupId = *groupId; if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
} else {
stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) {
doSetOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId =
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
} }
stopGroupTableMergeScan(pOperator);
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
startGroupTableMergeScan(pOperator);
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if(groupId) pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
}
doSetOperatorCompleted(pOperator);
return pBlock; return pBlock;
} }
@ -2403,6 +2440,12 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
const STableKeyInfo* info1 = p1;
const STableKeyInfo* info2 = p2;
return info1->groupId - info2->groupId;
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
@ -2411,6 +2454,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
if (pTableScanNode->pPartitionTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
}
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
@ -2443,7 +2489,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
pInfo->queryId = queryId; pInfo->queryId = queryId;
pInfo->taskId = taskId; pInfo->taskId = taskId;
pInfo->currentGroupId = -1;
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));

View File

@ -15,6 +15,7 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tfill.h" #include "tfill.h"
#include "ttime.h" #include "ttime.h"
@ -26,6 +27,16 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
typedef struct SPullWindowInfo {
STimeWindow window;
uint64_t groupId;
} SPullWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
@ -684,11 +695,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) return; if (pBlock == NULL){
SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock)); qDebug("======printDataBlock Block is Null");
taosArrayPush(blocks, pBlock); return;
blockDebugShowData(blocks, flag); }
taosArrayDestroy(blocks); char *pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFree(pBuf);
} }
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
@ -1217,30 +1230,40 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
} }
} }
void doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
int32_t numOfOutput) { int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return; return false;
} }
doClearWindowImpl(p1, pAggSup->pResultBuf, pSup, numOfOutput); doClearWindowImpl(p1, pAggSup->pResultBuf, pSup, numOfOutput);
return true;
} }
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex, static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex,
int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pTsCol->pData;
uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, 2);
pGpDatas = (uint64_t*)pGpCol->pData;
}
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); uint64_t groupId = pBlock->info.groupId;
if (pUpWins) { if (pGpDatas) {
groupId = pGpDatas[i];
}
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), groupId, numOfOutput);
if (pUpWins && res) {
taosArrayPush(pUpWins, &win); taosArrayPush(pUpWins, &win);
} }
} }
@ -1268,8 +1291,8 @@ bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark; return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
} }
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SArray* closeWins) { SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
@ -1280,10 +1303,20 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
SWinRes winRe = {.ts = win.skey, .groupId = groupId,};
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
if (isCloseWindow(&win, pSup)) { if (isCloseWindow(&win, pSup)) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; if (chIds && pPullDataMap) {
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SArray* chAy = *(SArray**) chIds;
taosHashRemove(pHashMap, keyBuf, keyLen); int32_t size = taosArrayGetSize(chAy);
qInfo("======window %ld wait child size:%d", win.skey ,size);
for (int32_t i = 0; i < size; i++) {
qInfo("======window %ld wait chid id:%d", win.skey ,*(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qInfo("======close window %ld", win.skey);
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte; SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
@ -1291,11 +1324,25 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
return code; return code;
} }
} }
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -1324,7 +1371,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
NULL); NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
@ -1345,7 +1392,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
} }
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
@ -1373,6 +1420,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
//it should be empty.
taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
if (pInfo->pChildren) { if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
@ -2164,6 +2216,24 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL; return p1 == NULL;
} }
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols,
int32_t startPos, TSKEY eKey, STimeWindow* pNextWin) {
int32_t forwardRows = getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos,
eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
int32_t prevEndPos = forwardRows - 1 + startPos;
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}
void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) {
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) {
taosArrayPush(childIds, &i);
}
taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*));
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) { SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
@ -2177,35 +2247,59 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t forwardRows = 0; int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) { ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
} else {
return;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
while (1) { while (1) {
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) {
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { bool ignore = true;
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,};
taosArrayPush(pUpWins, &nextWin); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pSup->numOfExprs, if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
pOperatorInfo->pTaskInfo); SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
taosArrayDestroy(pUpWins); // add pull data request
taosArrayPush(pInfo->pPullWins, &pull);
addPullWindow(pInfo->pPullDataMap, &winRes, taosArrayGetSize(pInfo->pChildren));
} else {
int32_t index = -1;
SArray* chArray = NULL;
if (chIds) {
chArray = *(void**) chIds;
int32_t chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index != -1 && pSDataBlock->info.type == STREAM_PUSH_DATA) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
}
}
if ( index == -1 || pSDataBlock->info.type == STREAM_PUSH_DATA) {
ignore = false;
}
}
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
} }
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
@ -2230,14 +2324,17 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
} }
static void clearUpdateDataBlock(SSDataBlock* pBlock) { static void clearSpecialDataBlock(SSDataBlock* pBlock) {
if (pBlock->info.rows <= 0) {
return;
}
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
} }
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
// ASSERT(pDest->info.capacity >= pSource->info.rows); // ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity(pDest, pSource->info.rows); blockDataEnsureCapacity(pDest, pSource->info.rows);
clearUpdateDataBlock(pDest); clearSpecialDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
@ -2254,7 +2351,63 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow(pDest, 0); blockDataUpdateTsWindow(pDest, 0);
} }
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static bool needBreak(SStreamFinalIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->pPullWins);
if (pInfo->pullIndex < size) {
return true;
}
return false;
}
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
clearSpecialDataBlock(pBlock);
int32_t size = taosArrayGetSize(array);
if (size - (*pIndex) == 0) {
return;
}
blockDataEnsureCapacity(pBlock, size - (*pIndex) );
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex) );
SColumnInfoData* pStartTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 0);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 1);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*) taosArrayGet(pBlock->pDataBlock, 2);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++;
}
if ((*pIndex) == size) {
*pIndex = 0;
taosArrayClear(array);
}
blockDataUpdateTsWindow(pBlock, 0);
}
void processPushEmpty(SSDataBlock* pBlock, SHashObj* pMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, 2);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes));
if (chIds) {
SArray* chArray = *(SArray**) chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosHashRemove(pMap, &winRes, sizeof(SWinRes));
}
}
}
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
@ -2270,28 +2423,50 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) { if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (IS_FINAL_OP(pInfo) || pInfo->pUpdateRes->info.rows == 0) { if (!IS_FINAL_OP(pInfo)) {
if (!IS_FINAL_OP(pInfo)) { // semi interval operator clear disk buffer
// semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo);
clearStreamIntervalOperator(pInfo);
}
return NULL;
} }
return NULL;
}
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
} else {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes;
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
return pInfo->binfo.pRes; doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
} }
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
pOperator->status = OP_RES_TO_RETURN;
qInfo("Stream Final Interval return data");
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PUSH_DATA || pBlock->info.type == STREAM_INVALID) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins); pBlock, pUpWins);
@ -2310,11 +2485,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
removeResults(pUpWins, pUpdated); removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
break;
}
continue;
} else if (pBlock->info.type == STREAM_PUSH_EMPTY && IS_FINAL_OP(pInfo)) {
processPushEmpty(pBlock, pInfo->pPullDataMap);
continue;
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
@ -2334,31 +2523,70 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, pBlock->info.window.ekey);
if (needBreak(pInfo)) {
break;
}
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pInfo->pPullDataMap, pUpdated);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
} }
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
pOperator->status = OP_RES_TO_RETURN; if (pInfo->binfo.pRes->info.rows != 0) {
if (pInfo->binfo.pRes->info.rows == 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
pOperator->status = OP_EXEC_DONE; return pInfo->binfo.pRes;
if (pInfo->pUpdateRes->info.rows == 0) { }
return NULL;
} if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
return pInfo->binfo.pRes;
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
// ASSERT(false);
return NULL;
}
SSDataBlock* createPullDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = STREAM_RETRIEVE;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
} }
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -2412,23 +2640,30 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
} }
// semi interval operator does not catch result
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS; pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pInfo->isFinal = true; pInfo->isFinal = true;
pOperator->name = "StreamFinalIntervalOperator"; pOperator->name = "StreamFinalIntervalOperator";
} else { } else {
// semi interval operator does not catch result
pInfo->isFinal = false; pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator"; pOperator->name = "StreamSemiIntervalOperator";
} }
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
} }
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
@ -2811,11 +3046,6 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
} }
} }
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStDeleted, bool hasEndTs) { SHashObj* pStDeleted, bool hasEndTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3027,14 +3257,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/ printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
/*printDataBlock(pBInfo->pRes, "session insert");*/ printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
@ -3048,7 +3278,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs,
pInfo->gap, pWins); pInfo->gap, pWins);
@ -3102,11 +3332,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
/*printDataBlock(pInfo->pDelRes, "session del");*/ printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
/*printDataBlock(pBInfo->pRes, "session insert");*/ printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo)? "Final Session" : "Single Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
@ -3169,11 +3399,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins); doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins);
removeSessionResults(pStUpdated, pWins); removeSessionResults(pStUpdated, pWins);
@ -3236,7 +3466,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
} else { } else {
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS; pInfo->pUpdateRes->info.type = STREAM_CLEAR;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator"; pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet = pOperator->fpSet =
@ -3554,7 +3784,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
break; break;
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_CLEAR) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted); pSeUpdated, pInfo->pSeDeleted);
continue; continue;
@ -3900,18 +4130,22 @@ _error:
// merge interval operator // merge interval operator
typedef struct SMergeIntervalAggOperatorInfo { typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo; SIntervalAggOperatorInfo intervalAggOperatorInfo;
SList* groupIntervals;
SHashObj* groupIntervalHash; SListIter groupIntervalsIter;
void* groupIntervalIter;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
bool inputBlocksFinished; bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo; } SMergeIntervalAggOperatorInfo;
typedef struct SGroupTimeWindow {
uint64_t groupId;
STimeWindow window;
} SGroupTimeWindow;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash); tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
} }
@ -3940,15 +4174,22 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
if (prevWin == NULL) { tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
}
if ((ascScan && newWin->skey > prevWin->skey || (!ascScan) && newWin->skey < prevWin->skey)) { SListIter iter = {0};
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); SListNode* listNode = NULL;
while ((listNode = tdListNext(&iter)) != NULL) {
SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
if (prevGrpWin->groupId != tableGroupId ) {
continue;
}
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey)) {
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode);
}
} }
return 0; return 0;
@ -4075,6 +4316,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pBlock == NULL) { if (pBlock == NULL) {
tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
miaInfo->inputBlocksFinished = true; miaInfo->inputBlocksFinished = true;
break; break;
} }
@ -4100,14 +4342,12 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (miaInfo->inputBlocksFinished) { if (miaInfo->inputBlocksFinished) {
void* win = taosHashIterate(miaInfo->groupIntervalHash, miaInfo->groupIntervalIter); SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
if (win != NULL) {
miaInfo->groupIntervalIter = win;
size_t len = 0; if (listNode != NULL) {
uint64_t* pTableGroupId = taosHashGetKey(win, &len); SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
finalizeWindowResult(pOperator, *pTableGroupId, win, pRes); finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = *pTableGroupId; pRes->info.groupId = grpWin->groupId;
} }
} }
@ -4129,8 +4369,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
goto _error; goto _error;
} }
miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
miaInfo->groupIntervalIter = NULL;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;

View File

@ -115,6 +115,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.srcNodeId = pTask->nodeId, .srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId, .srcTaskId = pTask->taskId,
.pRetrieve = pRetrieve, .pRetrieve = pRetrieve,
.retrieveLen = dataStrLen,
}; };
int32_t sz = taosArrayGetSize(pTask->childEpInfo); int32_t sz = taosArrayGetSize(pTask->childEpInfo);
@ -146,7 +147,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.code = 0, .code = 0,
.msgType = TDMT_STREAM_RETRIEVE, .msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf, .pCont = buf,
.contLen = len, .contLen = sizeof(SMsgHead) + len,
}; };
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {

View File

@ -45,11 +45,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false); ASSERT(false);
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE && !hasData) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; //SSDataBlock block = {0};
block.info.type = STREAM_PUSH_DATA; //block.info.type = STREAM_PUSH_EMPTY;
block.info.childId = pTask->selfChildId; //block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);
pBlock->info.type = STREAM_PUSH_EMPTY;
pBlock->info.childId = pTask->selfChildId;
taosArrayPush(pRes, pBlock);
} }
break; break;
} }
@ -109,7 +114,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (type == STREAM_INPUT__TRIGGER) { if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock); blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {

View File

@ -119,6 +119,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
taosArrayPush(pInfo->pTsBuckets, &dumy); taosArrayPush(pInfo->pTsBuckets, &dumy);
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
return pInfo; return pInfo;
} }
@ -154,6 +155,9 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) { if (ts < maxTs - pInfo->watermark) {
// this window has been closed. // this window has been closed.
if (pInfo->pCloseWinSBF) {
return tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
}
return true; return true;
} }
@ -193,3 +197,19 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
taosArrayDestroy(pInfo->pTsSBFs); taosArrayDestroy(pInfo->pTsSBFs);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
if (pInfo->pCloseWinSBF) {
return;
}
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
}
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
if (!pInfo || !pInfo->pCloseWinSBF) {
return;
}
tScalableBfDestroy(pInfo->pCloseWinSBF);
pInfo->pCloseWinSBF = NULL;
}

View File

@ -914,6 +914,9 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeBecomeLeader(pSyncNode, "one replica start"); syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
return; return;
} }
@ -1662,6 +1665,12 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// change isStandBy to normal (election timeout) // change isStandBy to normal (election timeout)
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, tmpbuf); syncNodeBecomeLeader(pSyncNode, tmpbuf);
// Raft 3.6.2 Committing entries from previous terms
syncNodeReplicate(pSyncNode);
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
} else { } else {
syncNodeBecomeFollower(pSyncNode, tmpbuf); syncNodeBecomeFollower(pSyncNode, tmpbuf);
} }
@ -1807,16 +1816,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
// start replicate right now!
syncNodeReplicate(pSyncNode);
// start heartbeat timer // start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode); syncNodeStartHeartbeatTimer(pSyncNode);
// append noop
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
// trace log // trace log
do { do {
int32_t debugStrLen = strlen(debugStr); int32_t debugStrLen = strlen(debugStr);
@ -1841,9 +1843,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeReplicate(pSyncNode);
// do not use this syncNodeAppendNoop(pSyncNode);
// syncNodeEqNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
} }
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {

View File

@ -671,7 +671,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
fflush(pFile->fp); fflush(pFile->fp);
} }
bool taosValidFile(TdFilePtr pFile) { return pFile != NULL; } bool taosValidFile(TdFilePtr pFile) { return pFile != NULL && pFile->fd > 0; }
int32_t taosUmaskFile(int32_t maskVal) { int32_t taosUmaskFile(int32_t maskVal) {
#ifdef WINDOWS #ifdef WINDOWS

View File

@ -508,7 +508,6 @@ class TDDnode:
def stoptaosd(self): def stoptaosd(self):
if (not self.remoteIP == ""): if (not self.remoteIP == ""):
print("123")
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1)) self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
tdLog.info("stop dnode%d"%self.index) tdLog.info("stop dnode%d"%self.index)
return return
@ -518,18 +517,21 @@ class TDDnode:
toBeKilled = "valgrind.bin" toBeKilled = "valgrind.bin"
if self.running != 0: if self.running != 0:
psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}'" % (toBeKilled,self.index) if platform.system().lower() == 'windows':
processID = subprocess.check_output( os.system("wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId | xargs echo | awk '{print $2}' | xargs taskkill -f -pid"%self.index)
psCmd, shell=True).decode("utf-8") else:
psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}'" % (toBeKilled,self.index)
while(processID):
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output( processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8") psCmd, shell=True).decode("utf-8")
if self.valgrind:
time.sleep(2) while(processID):
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
if self.valgrind:
time.sleep(2)
self.running = 0 self.running = 0
tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))

58
tests/script/test-all.bat Normal file
View File

@ -0,0 +1,58 @@
@echo off
SETLOCAL EnableDelayedExpansion
for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a")
set /a a=0
echo Windows Taosd Full Test
set /a exitNum=0
rm -rf failed.txt
set caseFile="jenkins\\basic.txt"
if not "%2" == "" (
set caseFile="%2"
)
for /F "usebackq tokens=*" %%i in (!caseFile!) do (
set line=%%i
if "!line:~,9!" == "./test.sh" (
set /a a+=1
echo !a! Processing %%i
call :GetTimeSeconds !time!
set time1=!_timeTemp!
echo Start at !time!
call !line:./test.sh=wtest.bat! > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
)
)
exit !exitNum!
:colorEcho
set timeNow=%time%
call :GetTimeSeconds %timeNow%
set time2=%_timeTemp%
set /a interTime=%time2% - %time1%
echo End at %timeNow% , cast %interTime%s
echo off
<nul set /p ".=%DEL%" > "%~2"
findstr /v /a:%1 /R "^$" "%~2" nul
del "%~2" > nul 2>&1i
goto :eof
:GetTimeSeconds
set tt=%1
set tt=%tt:.= %
set tt=%tt::= %
set tt=%tt: 0= %
set /a index=1
for %%a in (%tt%) do (
if !index! EQU 1 (
set /a hh=%%a
)^
else if !index! EQU 2 (
set /a mm=%%a
)^
else if !index! EQU 3 (
set /a ss=%%a
)
set /a index=index+1
)
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof

View File

@ -80,17 +80,17 @@ endi
if $data03 != 4 then if $data03 != 4 then
print ======$data03 print ======$data03
return -1 goto loop1
endi endi
if $data04 != 52 then if $data04 != 52 then
print ======$data04 print ======$data04
return -1 goto loop1
endi endi
if $data05 != 13 then if $data05 != 13 then
print ======$data05 print ======$data05
return -1 goto loop1
endi endi
# row 1 # row 1
@ -179,7 +179,7 @@ sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1); sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2); sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ; sql create stream stream_t2 trigger at_once watermark 20s into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3); sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3); sql insert into ts1 values(1648791222001,2,2,3);

View File

@ -74,7 +74,7 @@ sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,t
sql create table ts1 using st tags(1,1,1); sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2); sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ; sql create stream stream_t2 trigger at_once watermark 20s into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1); sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2); sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3); sql insert into ts2 values(1648791211000,1,2,3,3);
@ -83,16 +83,16 @@ sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5); sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6); sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1); sql insert into ts1 values(1648791211000,1,2,3,7);
sql insert into ts1 values(1648791222001,2,2,3,2); sql insert into ts1 values(1648791222001,2,2,3,8);
sql insert into ts2 values(1648791211000,1,2,3,3); sql insert into ts2 values(1648791211000,1,2,3,9);
sql insert into ts2 values(1648791222001,2,2,3,4); sql insert into ts2 values(1648791222001,2,2,3,10);
$loop_count = 0 $loop_count = 0
loop2: loop2:
sleep 300 sleep 300
sql select * from streamtST; sql select * from streamtST order by c7 asc;
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 10 then
@ -104,8 +104,18 @@ print =====data01=$data01
goto loop2 goto loop2
endi endi
if $data02 != 1 then if $data11 != 1 then
print =====data02=$data02 print =====data11=$data11
goto loop2
endi
if $data21 != 1 then
print =====data21=$data21
goto loop2
endi
if $data31 != 2 then
print =====data31=$data31
goto loop2 goto loop2
endi endi
@ -114,8 +124,18 @@ print =====data03=$data03
goto loop2 goto loop2
endi endi
if $data04 != 2 then if $data13 != 2 then
print =====data04=$data04 print =====data13=$data13
goto loop2
endi
if $data23 != 1 then
print =====data23=$data23
goto loop2
endi
if $data33 != 4 then
print =====data33=$data33
goto loop2 goto loop2
endi endi

View File

@ -79,17 +79,17 @@ endi
if $data03 != 4 then if $data03 != 4 then
print ======$data03 print ======$data03
return -1 goto loop1
endi endi
if $data04 != 52 then if $data04 != 52 then
print ======$data04 print ======$data04
return -1 goto loop1
endi endi
if $data05 != 13 then if $data05 != 13 then
print ======$data05 print ======$data05
return -1 goto loop1
endi endi
# row 1 # row 1

View File

@ -20,7 +20,7 @@ class MyDnodes(TDDnodes):
self.simDeployed = False self.simDeployed = False
class TDTestCase: class TDTestCase:
noConn = True
def init(self,conn ,logSql): def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None self.TDDnodes = None
@ -40,7 +40,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
@ -81,7 +81,7 @@ class TDTestCase:
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\""
print(cmd) print(cmd)
os.system(cmd) os.system(cmd)

View File

@ -20,6 +20,7 @@ class MyDnodes(TDDnodes):
self.simDeployed = False self.simDeployed = False
class TDTestCase: class TDTestCase:
noConn = True
def init(self,conn ,logSql): def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
@ -40,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
@ -85,7 +86,7 @@ class TDTestCase:
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\""
print(cmd) print(cmd)
os.system(cmd) os.system(cmd)

View File

@ -86,7 +86,13 @@ class TMQCom:
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath processorName = buildPath + '\\build\\bin\\tmq_sim.exe'
if alias != 0:
processorNameNew = buildPath + '\\build\\bin\\tmq_sim_new.exe'
shellCmd = 'cp %s %s'%(processorName, processorNameNew)
os.system(shellCmd)
processorName = processorNameNew
shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:

View File

@ -288,7 +288,10 @@ class TDTestCase:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
os.system('pkill tmq_sim') if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe")
else:
os.system('pkill tmq_sim')
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")

View File

@ -2,7 +2,7 @@
SETLOCAL EnableDelayedExpansion SETLOCAL EnableDelayedExpansion
for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a") for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a")
set /a a=0 set /a a=0
if %1 == full ( if "%1" == "full" (
echo Windows Taosd Full Test echo Windows Taosd Full Test
set /a exitNum=0 set /a exitNum=0
del /Q /F failed.txt del /Q /F failed.txt

View File

@ -23,6 +23,7 @@ import platform
import socket import socket
import threading import threading
from distutils.log import warn as printf from distutils.log import warn as printf
from tkinter import N
from fabric2 import Connection from fabric2 import Connection
sys.path.append("../pytest") sys.path.append("../pytest")
from util.log import * from util.log import *
@ -187,9 +188,9 @@ if __name__ == "__main__":
tdLog.info("Procedures for tdengine deployed in %s" % (host)) tdLog.info("Procedures for tdengine deployed in %s" % (host))
if platform.system().lower() == 'windows': if platform.system().lower() == 'windows':
fileName = fileName.replace("/", os.sep)
if (masterIp == "" and not fileName[0:12] == "0-others\\udf"): if (masterIp == "" and not fileName[0:12] == "0-others\\udf"):
threading.Thread(target=checkRunTimeError,daemon=True).start() threading.Thread(target=checkRunTimeError,daemon=True).start()
tdCases.logSql(logSql)
tdLog.info("Procedures for testing self-deployment") tdLog.info("Procedures for testing self-deployment")
tdDnodes.init(deployPath, masterIp) tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster) tdDnodes.setTestCluster(testCluster)
@ -208,18 +209,46 @@ if __name__ == "__main__":
uModule = importlib.import_module(moduleName) uModule = importlib.import_module(moduleName)
try: try:
ucase = uModule.TDTestCase() ucase = uModule.TDTestCase()
if ((json.dumps(updateCfgDict) == '{}') and (ucase.updatecfgDict is not None)): if ((json.dumps(updateCfgDict) == '{}') and hasattr(ucase, 'updatecfgDict')):
updateCfgDict = ucase.updatecfgDict updateCfgDict = ucase.updatecfgDict
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode() updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode()
except Exception as r: except Exception as r:
print(r) print(r)
else: else:
pass pass
tdDnodes.deploy(1,updateCfgDict) if dnodeNums == 1 :
tdDnodes.start(1) tdDnodes.deploy(1,updateCfgDict)
conn = taos.connect( tdDnodes.start(1)
host="%s"%(host), tdCases.logSql(logSql)
config=tdDnodes.sim.getCfgDir()) else :
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums,mnodeNums=mnodeNums)
tdDnodes = ClusterDnodes(dnodeslist)
tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll()
for dnode in tdDnodes.dnodes:
tdDnodes.deploy(dnode.index,{})
for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
print(tdDnodes.getSimCfgPath(),host)
cluster.create_dnode(conn)
try:
if cluster.check_dnode(conn) :
print("check dnode ready")
except Exception as r:
print(r)
if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True:
conn = None
else:
conn = taos.connect(
host="%s"%(host),
config=tdDnodes.sim.getCfgDir())
if is_test_framework: if is_test_framework:
tdCases.runOneWindows(conn, fileName) tdCases.runOneWindows(conn, fileName)
else: else:
@ -307,4 +336,5 @@ if __name__ == "__main__":
tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py") tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py")
else: else:
tdLog.info("not need to query") tdLog.info("not need to query")
conn.close() if conn is not None:
conn.close()

View File

@ -156,6 +156,7 @@ void shellRunSingleCommandImp(char *command) {
} }
fname = sptr + 2; fname = sptr + 2;
while (*fname == ' ') fname++;
*sptr = '\0'; *sptr = '\0';
} }