diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 410fa02ded..73d043b2d0 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -184,7 +184,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows); +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, + uint32_t numOfRows); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, @@ -225,15 +226,16 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); -int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); -int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src); +int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); +int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src); SSDataBlock* createDataBlock(); void* blockDataDestroy(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); +SSDataBlock* createSpecialDataBlock(EStreamType type); -int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); +int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); @@ -249,7 +251,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); - char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c7f372f17b..bcb96dafcc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -140,7 +140,8 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { return TSDB_CODE_SUCCESS; } -static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) { +static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, + int32_t itemLen, int32_t numOfRows) { ASSERT(pColumnInfoData->info.bytes >= itemLen); size_t start = 1; @@ -148,21 +149,23 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren memcpy(pColumnInfoData->pData, pData, itemLen); int32_t t = 0; - int32_t count = log(numOfRows)/log(2); - while(t < count) { + int32_t count = log(numOfRows) / log(2); + while (t < count) { int32_t xlen = 1 << t; - memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen); + memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, + xlen * itemLen); t += 1; start += xlen; } // the tail part if (numOfRows > start) { - memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen); + memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, + (numOfRows - start) * itemLen); } if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { - for(int32_t i = 0; i < numOfRows; ++i) { + for (int32_t i = 0; i < numOfRows; ++i) { pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen; } @@ -170,7 +173,8 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren } } -int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) { +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, + uint32_t numOfRows) { ASSERT(pData != NULL && pColumnInfoData != NULL); int32_t len = pColumnInfoData->info.bytes; @@ -278,7 +282,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int } else { if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) { // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0; -// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); + // ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); if (tmp == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; @@ -557,7 +561,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { } int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { - int32_t numOfRows = *(int32_t*) buf; + int32_t numOfRows = *(int32_t*)buf; blockDataEnsureCapacity(pBlock, numOfRows); pBlock->info.rows = numOfRows; @@ -676,7 +680,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { * @return */ size_t blockDataGetSerialMetaSize(uint32_t numOfCols) { - // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column + // length | return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t); } @@ -1302,6 +1307,40 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { return TSDB_CODE_SUCCESS; } +SSDataBlock* createSpecialDataBlock(EStreamType type) { + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->info.hasVarCol = false; + pBlock->info.groupId = 0; + pBlock->info.rows = 0; + pBlock->info.type = type; + pBlock->info.rowSize = + sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); + pBlock->info.watermark = INT64_MIN; + + pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; + infoData.info.bytes = sizeof(TSKEY); + // window start ts + taosArrayPush(pBlock->pDataBlock, &infoData); + // window end ts + taosArrayPush(pBlock->pDataBlock, &infoData); + + infoData.info.type = TSDB_DATA_TYPE_UBIGINT; + infoData.info.bytes = sizeof(uint64_t); + // uid + taosArrayPush(pBlock->pDataBlock, &infoData); + // group id + taosArrayPush(pBlock->pDataBlock, &infoData); + + // calculate start ts + taosArrayPush(pBlock->pDataBlock, &infoData); + // calculate end ts + taosArrayPush(pBlock->pDataBlock, &infoData); + + return pBlock; +} + SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if (pDataBlock == NULL) { return NULL; @@ -1426,7 +1465,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { } void colDataDestroy(SColumnInfoData* pColData) { - if(!pColData) return; + if (!pColData) return; if (IS_VAR_DATA_TYPE(pColData->info.type)) { taosMemoryFreeClear(pColData->varmeta.offset); } else { @@ -1693,7 +1732,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } struct tm ptm = {0}; taosLocalTime(&tt, &ptm); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); @@ -1847,20 +1886,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) break; case TSDB_DATA_TYPE_VARCHAR: { memset(pBuf, 0, sizeof(pBuf)); - char* pData = colDataGetVarData(pColInfoData, j); + char* pData = colDataGetVarData(pColInfoData, j); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); memcpy(pBuf, varDataVal(pData), dataSize); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) return dumpBuf; - } break; + } break; case TSDB_DATA_TYPE_NCHAR: { - char* pData = colDataGetVarData(pColInfoData, j); + char* pData = colDataGetVarData(pColInfoData, j); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); memset(pBuf, 0, sizeof(pBuf)); - taosUcs4ToMbs((TdUcs4 *)varDataVal(pData), dataSize, pBuf); + taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) return dumpBuf; - } break; + } break; } } len += snprintf(dumpBuf + len, size - len, "\n"); @@ -1877,7 +1916,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @param pDataBlocks * @param vgId * @param suid - * + * */ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { @@ -1904,8 +1943,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); - int32_t rows = pDataBlock->info.rows; + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); + int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; // int64_t groupId = pDataBlock->info.groupId; @@ -1926,7 +1965,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB msgLen += sizeof(SSubmitBlk); int32_t dataLen = 0; - for (int32_t j = 0; j < rows; ++j) { // iterate by row + for (int32_t j = 0; j < rows; ++j) { // iterate by row tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf bool isStartKey = false; int32_t offset = 0; @@ -2089,7 +2128,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ // flag segment. // the inital bit is for column info int32_t* flagSegment = (int32_t*)data; - *flagSegment = (1<<31); + *flagSegment = (1 << 31); data += sizeof(int32_t); @@ -2149,7 +2188,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { const char* pStart = pData; - int32_t version = *(int32_t*) pStart; + int32_t version = *(int32_t*)pStart; pStart += sizeof(int32_t); ASSERT(version == 1); @@ -2158,7 +2197,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t); // total rows sizeof(int32_t) - int32_t numOfRows = *(int32_t*)pStart; + int32_t numOfRows = *(int32_t*)pStart; pStart += sizeof(int32_t); // total columns sizeof(int32_t) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9b252df58b..c644f1d08c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,7 +102,7 @@ int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); -int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp); +int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); @@ -173,7 +173,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); +int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver); +int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3f42d8360e..f8fac97710 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -816,7 +816,85 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); } -int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { +int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { + bool failed = false; + SDecoder* pCoder = &(SDecoder){0}; + SDeleteRes* pRes = &(SDeleteRes){0}; + + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); + if (pRes->uidList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + failed = true; + } + + tDecoderInit(pCoder, pReq, len); + tDecodeDeleteRes(pCoder, pRes); + /*ASSERT(pRes->skey != 0);*/ + /*ASSERT(pRes->ekey != 0);*/ + tDecoderClear(pCoder); + + int32_t sz = taosArrayGetSize(pRes->uidList); + if (sz == 0) { + taosArrayDestroy(pRes->uidList); + return 0; + } + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + blockDataEnsureCapacity(pDelBlock, sz); + pDelBlock->info.rows = sz; + pDelBlock->info.version = ver; + + for (int32_t i = 0; i < sz; i++) { + // start key column + SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataAppend(pStartCol, i, (const char*)&pRes->skey, false); // end key column + SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); + colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false); + // uid column + SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); + int64_t* pUid = taosArrayGet(pRes->uidList, i); + colDataAppend(pUidCol, i, (const char*)pUid, false); + + colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); + colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); + colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); + } + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; + + qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); + + SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); + SSDataBlock block = {0}; + assignOneDataBlock(&block, pDelBlock); + block.info.type = STREAM_DELETE_DATA; + taosArrayPush(pStreamBlock->blocks, &block); + + if (!failed) { + if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { + qError("stream task input del failed, task id %d", pTask->taskId); + continue; + } + + if (streamSchedExec(pTask) < 0) { + qError("stream task launch failed, task id %d", pTask->taskId); + continue; + } + } else { + streamTaskInputFail(pTask); + } + } + + return 0; +} + +int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { void* pIter = NULL; bool failed = false; SStreamDataSubmit* pSubmit = NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index ed7fa80c47..a57e8174fe 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,20 +213,25 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) { - if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; + if (vnodeIsRoleLeader(pTq->pVnode)) { + if (msgType == TDMT_VND_SUBMIT) { + if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; - void* data = taosMemoryMalloc(msgLen); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory"); - return -1; + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to copy data for stream since out of memory"); + return -1; + } + memcpy(data, msg, msgLen); + SSubmitReq* pReq = (SSubmitReq*)data; + pReq->version = ver; + + tqProcessSubmitReq(pTq, data, ver); + } + if (msgType == TDMT_VND_DELETE) { + tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); } - memcpy(data, msg, msgLen); - SSubmitReq* pReq = (SSubmitReq*)data; - pReq->version = ver; - - tqProcessStreamTrigger(pTq, data, ver); } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cd0c7ba1f3..96e8e44bc9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -371,7 +371,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { if (NULL == pMetaRsp) { return; } - + strcpy(pMetaRsp->dbFName, pVnode->config.dbname); pMetaRsp->dbId = pVnode->config.dbId; pMetaRsp->vgId = TD_VID(pVnode); @@ -527,7 +527,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); taosArrayPush(tbUids, &pCreateReq->uid); - vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); + vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); } taosArrayPush(rsp.pArray, &cRsp); @@ -1107,6 +1107,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq tDecoderInit(pCoder, pReq, len); tDecodeDeleteRes(pCoder, pRes); + ASSERT(pRes->skey != 0); + ASSERT(pRes->ekey != 0); for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3bed81f657..c248015604 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -38,11 +38,11 @@ extern "C" { #include "tlockfree.h" #include "tmsg.h" #include "tpagedbuf.h" -#include "tstreamUpdate.h" #include "tstream.h" +#include "tstreamUpdate.h" -#include "vnode.h" #include "executorInt.h" +#include "vnode.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); @@ -139,23 +139,23 @@ enum { }; typedef struct { - //TODO remove prepareStatus - STqOffsetVal prepareStatus; // for tmq - STqOffsetVal lastStatus; // for tmq - SMqMetaRsp metaRsp; // for tmq fetching meta - int8_t returned; - int64_t snapshotVer; + // TODO remove prepareStatus + STqOffsetVal prepareStatus; // for tmq + STqOffsetVal lastStatus; // for tmq + SMqMetaRsp metaRsp; // for tmq fetching meta + int8_t returned; + int64_t snapshotVer; - SSchemaWrapper *schema; - char tbName[TSDB_TABLE_NAME_LEN]; - SSDataBlock* pullOverBlk; // for streaming - SWalFilterCond cond; - int64_t lastScanUid; - int8_t recoverStep; + SSchemaWrapper* schema; + char tbName[TSDB_TABLE_NAME_LEN]; + SSDataBlock* pullOverBlk; // for streaming + SWalFilterCond cond; + int64_t lastScanUid; + int8_t recoverStep; SQueryTableDataCond tableCond; - int64_t recoverStartVer; - int64_t recoverEndVer; - SStreamState* pState; + int64_t recoverStartVer; + int64_t recoverEndVer; + SStreamState* pState; } SStreamTaskInfo; typedef struct { @@ -167,29 +167,29 @@ typedef struct { } SSchemaInfo; typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; - int64_t version; // used for stream to record wal version - SStreamTaskInfo streamInfo; - SSchemaInfo schemaInfo; - STableListInfo tableqinfoList; // this is a table list - const char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SSubplan* pSubplan; + int64_t version; // used for stream to record wal version + SStreamTaskInfo streamInfo; + SSchemaInfo schemaInfo; + STableListInfo tableqinfoList; // this is a table list + const char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SSubplan* pSubplan; struct SOperatorInfo* pRoot; } SExecTaskInfo; enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorFpSet { @@ -228,7 +228,7 @@ typedef struct SOperatorInfo { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; @@ -251,26 +251,26 @@ typedef struct SLoadRemoteDataInfo { } SLoadRemoteDataInfo; typedef struct SLimitInfo { - SLimit limit; - SLimit slimit; - uint64_t currentGroupId; - int64_t remainGroupOffset; - int64_t numOfOutputGroups; - int64_t remainOffset; - int64_t numOfOutputRows; + SLimit limit; + SLimit slimit; + uint64_t currentGroupId; + int64_t remainGroupOffset; + int64_t numOfOutputGroups; + int64_t remainOffset; + int64_t numOfOutputRows; } SLimitInfo; typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; // SArray, result block list, used to keep the multi-block that // passed by downstream operator SArray* pResultBlockList; - int32_t rspBlockIndex; // indicate the return block index in pResultBlockList - SSDataBlock* pDummyBlock; // dummy block, not keep data - bool seqLoadData; // sequential load data or not, false by default + int32_t rspBlockIndex; // indicate the return block index in pResultBlockList + SSDataBlock* pDummyBlock; // dummy block, not keep data + bool seqLoadData; // sequential load data or not, false by default int32_t current; SLoadRemoteDataInfo loadInfo; uint64_t self; @@ -278,22 +278,22 @@ typedef struct SExchangeInfo { } SExchangeInfo; typedef struct SColMatchInfo { - int32_t srcSlotId; // source slot id + int32_t srcSlotId; // source slot id int32_t colId; int32_t targetSlotId; - bool output; // todo remove this? + bool output; // todo remove this? bool reserved; - int32_t matchType; // determinate the source according to col id or slot id + int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; typedef struct SScanInfo { - int32_t numOfAsc; - int32_t numOfDesc; + int32_t numOfAsc; + int32_t numOfDesc; } SScanInfo; typedef struct SSampleExecInfo { - double sampleRatio; // data block sample ratio, 1 by default - uint32_t seed; // random seed value + double sampleRatio; // data block sample ratio, 1 by default + uint32_t seed; // random seed value } SSampleExecInfo; enum { @@ -305,39 +305,40 @@ typedef struct SAggSupporter { SSHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - int32_t currentPageId; // current write page id + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // current write page id } SAggSupporter; typedef struct { - // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. - SInterval interval; - SAggSupporter *pAggSup; - SExprSupp *pExprSup; // expr supporter of aggregate operator + // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if + // current data block needs to be loaded. + SInterval interval; + SAggSupporter* pAggSup; + SExprSupp* pExprSup; // expr supporter of aggregate operator } SAggOptrPushDownInfo; typedef struct STableScanInfo { - STsdbReader* dataReader; - SReadHandle readHandle; + STsdbReader* dataReader; + SReadHandle readHandle; SFileBlockLoadRecorder readRecorder; - SScanInfo scanInfo; - int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer - SSDataBlock* pResBlock; - SArray* pColMatchInfo; - SExprSupp pseudoSup; - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - SSampleExecInfo sample; // sample execution info - int32_t currentGroupId; - int32_t currentTable; - int8_t scanMode; - int8_t noTable; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + SExprSupp pseudoSup; + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SSampleExecInfo sample; // sample execution info + int32_t currentGroupId; + int32_t currentTable; + int8_t scanMode; + int8_t noTable; SAggOptrPushDownInfo pdInfo; - int8_t assignBlockUid; + int8_t assignBlockUid; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -370,7 +371,7 @@ typedef struct STableMergeScanInfo { SArray* pColMatchInfo; int32_t numOfOutput; - SExprSupp pseudoSup; + SExprSupp pseudoSup; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan @@ -384,26 +385,26 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; typedef struct STagScanInfo { - SColumnInfo *pCols; - SSDataBlock *pRes; - SArray *pColMatchInfo; - int32_t curPos; - SReadHandle readHandle; - STableListInfo *pTableList; + SColumnInfo* pCols; + SSDataBlock* pRes; + SArray* pColMatchInfo; + int32_t curPos; + SReadHandle readHandle; + STableListInfo* pTableList; } STagScanInfo; typedef struct SLastrowScanInfo { - SSDataBlock *pRes; - SReadHandle readHandle; - void *pLastrowReader; - SArray *pColMatchInfo; - int32_t *pSlotIds; - SExprSupp pseudoExprSup; - int32_t retrieveType; - int32_t currentGroupIndex; - SSDataBlock *pBufferredRes; - SArray *pUidList; - int32_t indexOfBufferedRes; + SSDataBlock* pRes; + SReadHandle readHandle; + void* pLastrowReader; + SArray* pColMatchInfo; + int32_t* pSlotIds; + SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock* pBufferredRes; + SArray* pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -432,10 +433,10 @@ typedef struct SStreamAggSupporter { SArray* pCurWins; int32_t valueSize; int32_t keySize; - char* pKeyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - int32_t currentPageId; // buffer page that is active + char* pKeyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // buffer page that is active SSDataBlock* pScanBlock; } SStreamAggSupporter; @@ -447,22 +448,22 @@ typedef struct SWindowSupporter { } SWindowSupporter; typedef struct SPartitionBySupporter { - SArray* pGroupCols; // group by columns, SArray - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - bool needCalc; // partition by column + SArray* pGroupCols; // group by columns, SArray + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + bool needCalc; // partition by column } SPartitionBySupporter; typedef struct SPartitionDataInfo { - uint64_t groupId; - SArray* rowIds; + uint64_t groupId; + SArray* rowIds; } SPartitionDataInfo; typedef struct STimeWindowSupp { - int8_t calTrigger; - int64_t waterMark; - TSKEY maxTs; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + int8_t calTrigger; + int64_t waterMark; + TSKEY maxTs; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; typedef struct SStreamScanInfo { @@ -487,22 +488,22 @@ typedef struct SStreamScanInfo { uint64_t groupId; SUpdateInfo* pUpdateInfo; - EStreamScanMode scanMode; - SOperatorInfo* pStreamScanOp; - SOperatorInfo* pTableScanOp; - SArray* childIds; - SWindowSupporter windowSup; - SPartitionBySupporter partitionSup; - SExprSupp* pPartScalarSup; - bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. - int32_t scanWinIndex; // for state operator - int32_t pullDataResIndex; - SSDataBlock* pPullDataRes; // pull data SSDataBlock - SSDataBlock* pDeleteDataRes; // delete data SSDataBlock - int32_t deleteDataIndex; - STimeWindow updateWin; - STimeWindowAggSupp twAggSup; - SSDataBlock* pUpdateDataRes; + EStreamScanMode scanMode; + SOperatorInfo* pStreamScanOp; + SOperatorInfo* pTableScanOp; + SArray* childIds; + SWindowSupporter windowSup; + SPartitionBySupporter partitionSup; + SExprSupp* pPartScalarSup; + bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. + int32_t scanWinIndex; // for state operator + int32_t pullDataResIndex; + SSDataBlock* pPullDataRes; // pull data SSDataBlock + SSDataBlock* pDeleteDataRes; // delete data SSDataBlock + int32_t deleteDataIndex; + STimeWindow updateWin; + STimeWindowAggSupp twAggSup; + SSDataBlock* pUpdateDataRes; // status for tmq SNodeList* pGroupTags; SNode* pTagCond; @@ -571,17 +572,17 @@ typedef struct SIntervalAggOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; bool invertible; - SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. + SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. bool ignoreExpiredData; SArray* pRecycledPages; - SArray* pDelWins; // SWinRes + SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; SNode* pCondition; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { - SIntervalAggOperatorInfo *intervalAggOperatorInfo; + SIntervalAggOperatorInfo* intervalAggOperatorInfo; bool hasGroupId; uint64_t groupId; // current groupId @@ -592,63 +593,63 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { typedef struct SStreamFinalIntervalOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; // basic info - SAggSupporter aggSup; // aggregate supporter - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - int32_t order; // current SSDataBlock scan order + SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; SArray* pChildren; SSDataBlock* pUpdateRes; bool returnUpdate; - SPhysiNode* pPhyNode; // create new child + SPhysiNode* pPhyNode; // create new child bool isFinal; SHashObj* pPullDataMap; - SArray* pPullWins; // SPullWindowInfo + SArray* pPullWins; // SPullWindowInfo int32_t pullIndex; SSDataBlock* pPullDataRes; bool ignoreExpiredData; SArray* pRecycledPages; - SArray* pDelWins; // SWinRes + SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + SAggSupporter aggSup; - STableQueryInfo *current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; - SNode *pCondition; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; + SNode* pCondition; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SNode* pFilterNode; // filter info, which is push down by optimizer - SArray* pPseudoColInfo; - SLimitInfo limitInfo; - bool mergeDataBlocks; - SSDataBlock* pFinalRes; - SNode* pCondition; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SNode* pFilterNode; // filter info, which is push down by optimizer + SArray* pPseudoColInfo; + SLimitInfo limitInfo; + bool mergeDataBlocks; + SSDataBlock* pFinalRes; + SNode* pCondition; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SArray* pPseudoColInfo; - SExprSupp scalarSup; - SNode* pCondition; - uint64_t groupId; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SArray* pPseudoColInfo; + SExprSupp scalarSup; + SNode* pCondition; + uint64_t groupId; - SSDataBlock* pNextGroupRes; + SSDataBlock* pNextGroupRes; } SIndefOperatorInfo; typedef struct SFillOperatorInfo { @@ -663,7 +664,7 @@ typedef struct SFillOperatorInfo { SArray* pColMatchColInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; - uint64_t curGroupId; // current handled group id + uint64_t curGroupId; // current handled group id SExprInfo* pExprInfo; int32_t numOfExpr; SExprInfo* pNotFillExprInfo; @@ -672,23 +673,23 @@ typedef struct SFillOperatorInfo { typedef struct SGroupbyOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + SAggSupporter aggSup; - SArray* pGroupCols; // group by columns, SArray - SArray* pGroupColVals; // current group column values, SArray - SNode* pCondition; - bool isInit; // denote if current val is initialized or not - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SGroupResInfo groupResInfo; - SExprSupp scalarSup; + SArray* pGroupCols; // group by columns, SArray + SArray* pGroupColVals; // current group column values, SArray + SNode* pCondition; + bool isInit; // denote if current val is initialized or not + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SGroupResInfo groupResInfo; + SExprSupp scalarSup; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { - uint64_t groupId; - int64_t numOfRows; - SArray* pPageList; + uint64_t groupId; + int64_t numOfRows; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. @@ -700,12 +701,12 @@ typedef struct SPartitionOperatorInfo { int32_t groupKeyLen; // total group by column width SHashObj* pGroupSet; // quick locate the window object for each result - SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file - int32_t rowCapacity; // maximum number of rows for each buffer page - int32_t* columnOffset; // start position for each column data - SArray* sortedGroupArray; // SDataGroupInfo sorted by group id - int32_t groupIndex; // group index - int32_t pageIndex; // page index of current group + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + SArray* sortedGroupArray; // SDataGroupInfo sorted by group id + int32_t groupIndex; // group index + int32_t pageIndex; // page index of current group SExprSupp scalarSup; } SPartitionOperatorInfo; @@ -719,52 +720,52 @@ typedef struct SWindowRowsSup { typedef struct SSessionAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + SAggSupporter aggSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - int32_t tsSlotId; // primary timestamp slot id + bool reptScan; // next round scan + int64_t gap; // session window gap + int32_t tsSlotId; // primary timestamp slot id STimeWindowAggSupp twAggSup; - const SNode* pCondition; + const SNode* pCondition; } SSessionAggOperatorInfo; typedef struct SResultWindowInfo { SResultRowPosition pos; - STimeWindow win; - uint64_t groupId; - bool isOutput; - bool isClosed; + STimeWindow win; + uint64_t groupId; + bool isOutput; + bool isClosed; } SResultWindowInfo; typedef struct SStateWindowInfo { SResultWindowInfo winInfo; - SStateKeys stateKey; + SStateKeys stateKey; } SStateWindowInfo; typedef struct SStreamSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SStreamAggSupporter streamAggSup; - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; - int64_t gap; // session window gap - int32_t primaryTsIndex; // primary timestamp slot id - int32_t endTsIndex; // window end timestamp slot id - int32_t order; // current SSDataBlock scan order - STimeWindowAggSupp twAggSup; - SSDataBlock* pWinBlock; // window result - SqlFunctionCtx* pDummyCtx; // for combine - SSDataBlock* pDelRes; // delete result - SSDataBlock* pUpdateRes; // update window - bool returnUpdate; - SHashObj* pStDeleted; - void* pDelIterator; - SArray* pChildren; // cache for children's result; final stream operator - SPhysiNode* pPhyNode; // create new child - bool isFinal; - bool ignoreExpiredData; + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + int64_t gap; // session window gap + int32_t primaryTsIndex; // primary timestamp slot id + int32_t endTsIndex; // window end timestamp slot id + int32_t order; // current SSDataBlock scan order + STimeWindowAggSupp twAggSup; + SSDataBlock* pWinBlock; // window result + SqlFunctionCtx* pDummyCtx; // for combine + SSDataBlock* pDelRes; // delete result + SSDataBlock* pUpdateRes; // update window + bool returnUpdate; + SHashObj* pStDeleted; + void* pDelIterator; + SArray* pChildren; // cache for children's result; final stream operator + SPhysiNode* pPhyNode; // create new child + bool isFinal; + bool ignoreExpiredData; } SStreamSessionAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { @@ -778,26 +779,26 @@ typedef struct SStreamPartitionOperatorInfo { } SStreamPartitionOperatorInfo; typedef struct STimeSliceOperatorInfo { - SSDataBlock* pRes; - STimeWindow win; - SInterval interval; - int64_t current; - SArray* pPrevRow; // SArray - SArray* pNextRow; // SArray - SArray* pLinearInfo; // SArray - bool fillLastPoint; - bool isPrevRowSet; - bool isNextRowSet; - int32_t fillType; // fill type - SColumn tsCol; // primary timestamp column - SExprSupp scalarSup; // scalar calculation - struct SFillColInfo* pFillColInfo; // fill column info + SSDataBlock* pRes; + STimeWindow win; + SInterval interval; + int64_t current; + SArray* pPrevRow; // SArray + SArray* pNextRow; // SArray + SArray* pLinearInfo; // SArray + bool fillLastPoint; + bool isPrevRowSet; + bool isNextRowSet; + int32_t fillType; // fill type + SColumn tsCol; // primary timestamp column + SExprSupp scalarSup; // scalar calculation + struct SFillColInfo* pFillColInfo; // fill column info } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + SAggSupporter aggSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; @@ -811,52 +812,52 @@ typedef struct SStateWindowOperatorInfo { } SStateWindowOperatorInfo; typedef struct SStreamStateAggOperatorInfo { - SOptrBasicInfo binfo; - SStreamAggSupporter streamAggSup; - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; - int32_t primaryTsIndex; // primary timestamp slot id - int32_t order; // current SSDataBlock scan order - STimeWindowAggSupp twAggSup; - SColumn stateCol; - SqlFunctionCtx* pDummyCtx; // for combine - SSDataBlock* pDelRes; - SHashObj* pSeDeleted; - void* pDelIterator; - SArray* pChildren; // cache for children's result; - bool ignoreExpiredData; + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + int32_t primaryTsIndex; // primary timestamp slot id + int32_t order; // current SSDataBlock scan order + STimeWindowAggSupp twAggSup; + SColumn stateCol; + SqlFunctionCtx* pDummyCtx; // for combine + SSDataBlock* pDelRes; + SHashObj* pSeDeleted; + void* pDelIterator; + SArray* pChildren; // cache for children's result; + bool ignoreExpiredData; } SStreamStateAggOperatorInfo; typedef struct SSortedMergeOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + SAggSupporter aggSup; - SArray* pSortInfo; - int32_t numOfSources; - SSortHandle *pSortHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t resultRowFactor; - bool hasGroupVal; - SDiskbasedBuf *pTupleStore; // keep the final results - int32_t numOfResPerPage; - char** groupVal; - SArray *groupInfo; + SArray* pSortInfo; + int32_t numOfSources; + SSortHandle* pSortHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + int32_t resultRowFactor; + bool hasGroupVal; + SDiskbasedBuf* pTupleStore; // keep the final results + int32_t numOfResPerPage; + char** groupVal; + SArray* groupInfo; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { SOptrBasicInfo binfo; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SArray* pColMatchInfo; // for index map from table scan output - int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SArray* pColMatchInfo; // for index map from table scan output + int32_t bufPageSize; - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SLimitInfo limitInfo; - SNode* pCondition; + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -864,18 +865,18 @@ typedef struct STagFilterOperatorInfo { } STagFilterOperatorInfo; typedef struct SJoinOperatorInfo { - SSDataBlock *pRes; - int32_t joinType; - int32_t inputOrder; + SSDataBlock* pRes; + int32_t joinType; + int32_t inputOrder; - SSDataBlock *pLeft; - int32_t leftPos; - SColumnInfo leftCol; + SSDataBlock* pLeft; + int32_t leftPos; + SColumnInfo leftCol; - SSDataBlock *pRight; - int32_t rightPos; - SColumnInfo rightCol; - SNode *pCondAfterMerge; + SSDataBlock* pRight; + int32_t rightPos; + SColumnInfo rightCol; + SNode* pCondAfterMerge; } SJoinOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) @@ -884,8 +885,8 @@ typedef struct SJoinOperatorInfo { void doDestroyExchangeOperatorInfo(void* param); SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, - __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_explain_fn_t explain); + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_explain_fn_t explain); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); @@ -896,24 +897,26 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); -int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, +int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); -void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows); -void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); -int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); +void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); +void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf); +int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); -int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart); -void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, - SOperatorInfo* pOperator); +int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, + char** pNextStart); +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, + SOperatorInfo* pOperator); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); @@ -921,10 +924,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pC int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); -void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); -void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); +void cleanupAggSup(SAggSupporter* pAggSup); +void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); +void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts); int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts); @@ -933,29 +936,37 @@ SSDataBlock* loadNextDataBlock(void* param); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); -SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, - char* pData, int16_t bytes, bool masterscan, uint64_t groupId, - SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); +SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, + int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, + bool isIntervalQuery, SAggSupporter* pSup); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, + const char* pUser, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, + SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); + STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, bool isStream); SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, @@ -975,33 +986,39 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, - SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, - SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, + SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo); int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput, SArray* pPseudoList); + int32_t numOfOutput, SArray* pPseudoList); -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, + int32_t scanFlag, bool createDummyCol); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); -void setTaskKilled(SExecTaskInfo* pTaskInfo); -void queryCostStatis(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo); +void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); @@ -1013,7 +1030,7 @@ int32_t getMaximumIdleDurationSec(); * nOptrWithVal: *nOptrWithVal save the number of optr with value * return: result code, 0 means success */ -int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal); +int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal); /* * ops: root operator, created by caller @@ -1026,7 +1043,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); @@ -1035,42 +1052,43 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, - __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); + __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, - int32_t size); -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); -SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, - TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); -SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, - TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); -bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); -bool functionNeedToExecute(SqlFunctionCtx* pCtx); -bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); -bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); -bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); -void printDataBlock(SSDataBlock* pBlock, const char* flag); + int32_t size); +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); +SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, + int64_t gap, int32_t* pIndex); +SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, + int64_t gap, int32_t* pIndex); +bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); +bool functionNeedToExecute(SqlFunctionCtx* pCtx); +bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); +bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); +bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); +void printDataBlock(SSDataBlock* pBlock, const char* flag); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, - SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, + const int32_t* rowCellOffset, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo); -int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idstr); +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + const char* idstr); SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); -bool groupbyTbname(SNodeList* pGroupList); +bool groupbyTbname(SNodeList* pGroupList); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); -SSDataBlock* createSpecialDataBlock(EStreamType type); -void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); +void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b740ec21d3..60670b1a49 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -955,9 +955,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->currentGroupId = -1; } -static void freeArray(void* array) { - taosArrayDestroy(array); -} +static void freeArray(void* array) { taosArrayDestroy(array); } static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { STableScanInfo* pTableScanInfo = pTableScanOp->info; @@ -971,15 +969,16 @@ static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { resetTableScanInfo(pTableScanOp->info, &win); } -static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { +static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, + int64_t maxVersion) { SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; taosArrayClear(gpTbls); STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; - SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); + SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(tbls, &tblInfo); taosArrayPush(gpTbls, &tbls); - STimeWindow win = {.skey = startTs, .ekey = endTs}; + STimeWindow win = {.skey = startTs, .ekey = endTs}; STableScanInfo* pTableScanInfo = pTableScanOp->info; pTableScanInfo->cond.startVersion = -1; pTableScanInfo->cond.endVersion = maxVersion; @@ -1068,8 +1067,8 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo if (hasGroup) { (*pRowIndex) += 1; } else { - (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, - binarySearchForKey, NULL, TSDB_ORDER_ASC); + (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); } do { preWin = endWin; @@ -1110,8 +1109,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) { SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j); SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j); - bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); - char* pSrcData = colDataGetData(pSrcCol, i); + bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); + char* pSrcData = colDataGetData(pSrcCol, i); colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull); } pResult->info.rows++; @@ -1152,7 +1151,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); int32_t dummy = 0; - int64_t version = pSrcBlock->info.version - 1; + int64_t version = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version); // gap must be 0. @@ -1201,15 +1200,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - int64_t version = pSrcBlock->info.version - 1; + int64_t version = pSrcBlock->info.version - 1; for (int32_t i = 0; i < rows;) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = getGroupIdByData(pInfo, srcUid, tsCol[i], version); uint64_t srcGpId = srcGp[i]; - TSKEY calStartTs = tsCol[i]; + TSKEY calStartTs = tsCol[i]; colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false); STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i, pInfo->partitionSup.needCalc); - TSKEY calEndTs = tsCol[i - 1]; + TSKEY calEndTs = tsCol[i - 1]; colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false); colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false); colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); @@ -1277,10 +1276,10 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup); if ((update || closedWin) && out) { - uint64_t gpId = closedWin&&pInfo->partitionSup.needCalc ? - calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) : 0; - appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, - &gpId); + uint64_t gpId = closedWin && pInfo->partitionSup.needCalc + ? calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) + : 0; + appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId); } } if (out && pInfo->pUpdateDataRes->info.rows > 0) { @@ -1515,6 +1514,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; return pInfo->pDeleteDataRes; } break; @@ -1915,7 +1915,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ASSERT(pHandle->tqReader); pInfo->tqReader = pHandle->tqReader; } - + pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; pInfo->interval = pTSInfo->pdInfo.interval; @@ -1947,8 +1947,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->windowSup = - (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; + pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pStreamScanOp = pOperator; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 26667cd64f..ab28c91867 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -35,14 +35,15 @@ typedef struct SPullWindowInfo { typedef struct SOpenWindowInfo { SResultRowPosition pos; - uint64_t groupId; + uint64_t groupId; } SOpenWindowInfo; static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); -static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId); +static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, + uint64_t groupId); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); ///* @@ -601,15 +602,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExprSupp* pSup = &pOperatorInfo->exprSupp; - int32_t startPos = 0; - int32_t numOfOutput = pSup->numOfExprs; + int32_t startPos = 0; + int32_t numOfOutput = pSup->numOfExprs; SResultRow* pResult = NULL; while (1) { - SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); - SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data; - uint64_t groupId = pOpenWin->groupId; + SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); + SOpenWindowInfo* pOpenWin = (SOpenWindowInfo*)pn->data; + uint64_t groupId = pOpenWin->groupId; SResultRowPosition* p1 = &pOpenWin->pos; if (p->pageId == p1->pageId && p->offset == p1->offset) { break; @@ -1028,7 +1029,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); // window start(end) key interpolation doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); - //TODO: add to open window? how to close the open windows after input blocks exhausted? + // TODO: add to open window? how to close the open windows after input blocks exhausted? #if 0 if ((ascScan && ekey <= pBlock->info.window.ekey) || (!ascScan && ekey >= pBlock->info.window.skey)) { @@ -1062,13 +1063,13 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe openWin.pos.pageId = pResult->pageId; openWin.pos.offset = pResult->offset; openWin.groupId = groupId; - SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); + SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); if (pn == NULL) { tdListAppend(pResultRowInfo->openWindow, &openWin); return openWin.pos; } - SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data; + SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data; if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) { tdListAppend(pResultRowInfo->openWindow, &openWin); } @@ -1424,7 +1425,8 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) return true; } -void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) { +void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, + SHashObj* pUpdatedMap) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsStarts = (TSKEY*)pStartCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -1469,7 +1471,6 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { - void* pIte = NULL; size_t keyLen = 0; int32_t iter = 0; @@ -1826,7 +1827,8 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, int64_t waterMark) { +void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, + int64_t waterMark) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark); return; @@ -1921,7 +1923,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { - initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); + initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval, + pInfo->twAggSup.waterMark); } code = appendDownstream(pOperator, &downstream, 1); @@ -2248,7 +2251,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp if (hasInterp) { pResBlock->info.rows += 1; } - } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2721,7 +2723,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } - int32_t num = 0; + int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; @@ -2748,7 +2750,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};; + pInfo->twAggSup = + (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + ; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; @@ -2857,8 +2861,8 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]); - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - SColumnInfoData idata = {0}; + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + SColumnInfoData idata = {0}; idata.info.type = TSDB_DATA_TYPE_BIGINT; idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; idata.pData = p; @@ -2867,7 +2871,7 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; pDestCtx[k].sfp.process(&tw, 1, &out); pEntryInfo->numOfRes = 1; - }else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { + } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); if (code != TSDB_CODE_SUCCESS) { qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); @@ -2893,7 +2897,8 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { } static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, - int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pUpdatedMap) { + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, + SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWinArray); if (!pInfo->pChildren) { return; @@ -2934,7 +2939,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); + GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); return p1 == NULL; } @@ -3059,7 +3064,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->aggSup.currentPageId = -1; + pInfo->aggSup.currentPageId = -1; } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -3145,7 +3150,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pInfo) { int32_t size = taosArrayGetSize(wins); for (int32_t i = 0; i < size; i++) { - SWinKey* winKey = taosArrayGet(wins, i); + SWinKey* winKey = taosArrayGet(wins, i); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); if (isCloseWindow(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); @@ -3274,8 +3279,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL); - rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, - pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdatedMap); + rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, + pOperator->pTaskInfo, pUpdatedMap); addRetriveWindow(delWins, pInfo); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -3381,40 +3386,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } -SSDataBlock* createSpecialDataBlock(EStreamType type) { - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->info.hasVarCol = false; - pBlock->info.groupId = 0; - pBlock->info.rows = 0; - pBlock->info.type = type; - pBlock->info.rowSize = - sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); - pBlock->info.watermark = INT64_MIN; - - pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; - infoData.info.bytes = sizeof(TSKEY); - // window start ts - taosArrayPush(pBlock->pDataBlock, &infoData); - // window end ts - taosArrayPush(pBlock->pDataBlock, &infoData); - - infoData.info.type = TSDB_DATA_TYPE_UBIGINT; - infoData.info.bytes = sizeof(uint64_t); - // uid - taosArrayPush(pBlock->pDataBlock, &infoData); - // group id - taosArrayPush(pBlock->pDataBlock, &infoData); - - // calculate start ts - taosArrayPush(pBlock->pDataBlock, &infoData); - // calculate end ts - taosArrayPush(pBlock->pDataBlock, &infoData); - - return pBlock; -} - SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -3709,7 +3680,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType, + pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; @@ -3740,12 +3712,14 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) { return isInTimeWindow(&pWinInfo->win, ts, gap); } static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs, int32_t index) { - SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; + SResultWindowInfo win = { + .pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; return taosArrayInsert(pWinInfos, index, &win); } static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs) { - SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; + SResultWindowInfo win = { + .pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; return taosArrayPush(pWinInfos, &win); } @@ -4147,8 +4121,8 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It } } -static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, - int32_t numOfOutput, SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) { +static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t numOfOutput, + SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4161,7 +4135,8 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin uint64_t groupId = pParentWin->groupId; int32_t winIndex = 0; if (needCreate) { - pParentWin = getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex); + pParentWin = + getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex); } setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); @@ -4292,9 +4267,9 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { } int32_t compareWinKey(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; + SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); - SWinKey* pData = (SWinKey*)pKey; + SWinKey* pData = (SWinKey*)pKey; if (pData->ts == *(int64_t*)pos->key) { if (pData->groupId > pos->groupId) { return 1; @@ -4317,7 +4292,7 @@ static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) { int32_t num = taosArrayGetSize(update); for (int32_t i = 0; i < num; i++) { SResKeyPos* pos = taosArrayGetP(update, i); - SWinKey winKey = {.ts = *(int64_t*)pos->key, .groupId = pos->groupId}; + SWinKey winKey = {.ts = *(int64_t*)pos->key, .groupId = pos->groupId}; taosHashRemove(pStDeleted, &winKey, sizeof(SWinKey)); } } @@ -4346,7 +4321,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); // SResKeyPos + SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); // SResKeyPos while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -4362,8 +4337,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; - doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, - 0, NULL); + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, + pChildOp->exprSupp.numOfExprs, 0, NULL); rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false); } taosArrayDestroy(pWins); @@ -4464,7 +4439,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - + { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { @@ -4856,8 +4831,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl i, &allEqual, pStDeleted); if (!allEqual) { uint64_t uid = 0; - appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - &uid, &groupId); + appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, &uid, &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4886,7 +4860,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; - int64_t maxTs = INT64_MIN; + int64_t maxTs = INT64_MIN; if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { @@ -5035,7 +5009,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType, + pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5064,8 +5039,8 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui SExprSupp* pSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, - GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( + iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, @@ -5274,11 +5249,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprSupp* pSup = &pOperator->exprSupp; miaInfo->pCondition = pNode->window.node.pConditions; - miaInfo->curTs = INT64_MIN; - iaInfo->win = pTaskInfo->window; - iaInfo->inputOrder = TSDB_ORDER_ASC; - iaInfo->interval = interval; - iaInfo->execModel = pTaskInfo->execModel; + miaInfo->curTs = INT64_MIN; + iaInfo->win = pTaskInfo->window; + iaInfo->inputOrder = TSDB_ORDER_ASC; + iaInfo->interval = interval; + iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId; iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock; @@ -5301,12 +5276,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, initResultRowInfo(&iaInfo->binfo.resultRowInfo); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); - pOperator->name = "TimeMergeAlignedIntervalAggOperator"; + pOperator->name = "TimeMergeAlignedIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = miaInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = miaInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); @@ -5359,8 +5334,8 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, - GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( + iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); @@ -5585,10 +5560,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo; - pIntervalInfo->win = pTaskInfo->window; + pIntervalInfo->win = pTaskInfo->window; pIntervalInfo->inputOrder = TSDB_ORDER_ASC; - pIntervalInfo->interval = interval; - pIntervalInfo->execModel = pTaskInfo->execModel; + pIntervalInfo->interval = interval; + pIntervalInfo->execModel = pTaskInfo->execModel; pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock; pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -5605,7 +5580,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initBasicInfo(&pIntervalInfo->binfo, pResBlock); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); - pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo); if (pIntervalInfo->timeWindowInterpo) { pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); @@ -5616,12 +5590,12 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); - pOperator->name = "TimeMergeIntervalAggOperator"; + pOperator->name = "TimeMergeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pMergeIntervalInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pMergeIntervalInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9d4010f60e..a2a45938e4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -243,6 +243,36 @@ FAIL: return 0; } +int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, + int64_t groupId) { + char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + // TODO: get hash function by hashMethod + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + taosMemoryFree(ctbName); + bool found = false; + // TODO: optimize search + int32_t j; + for (j = 0; j < vgSz; j++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + ASSERT(pVgInfo->vgId > 0); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + return -1; + } + if (pReqs[j].blockNum == 0) { + atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + } + pReqs[j].blockNum++; + found = true; + break; + } + } + ASSERT(found); + return 0; +} + int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = -1; int32_t blockNum = taosArrayGetSize(pData->blocks); @@ -317,20 +347,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat for (int32_t i = 0; i < blockNum; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId); - // TODO: get hash function by hashMethod - uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); - - taosMemoryFree(ctbName); - - bool found = false; - // TODO: optimize search - int32_t j; - for (j = 0; j < vgSz; j++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - ASSERT(pVgInfo->vgId > 0); - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + // TODO: do not use broadcast + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } @@ -338,11 +358,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); } pReqs[j].blockNum++; - found = true; - break; } + continue; + } + + if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.groupId) < 0) { + goto FAIL_SHUFFLE_DISPATCH; } - ASSERT(found); } for (int32_t i = 0; i < vgSz; i++) {