other: merge 3.0

This commit is contained in:
Haojun Liao 2023-05-18 17:28:40 +08:00
commit c4c5bbb65c
21 changed files with 365 additions and 196 deletions

View File

@ -82,7 +82,7 @@ typedef struct STuplePos {
int32_t pageId; int32_t pageId;
int32_t offset; int32_t offset;
}; };
STupleKey streamTupleKey; SWinKey streamTupleKey;
}; };
} STuplePos; } STuplePos;

View File

@ -86,9 +86,8 @@ typedef struct {
int64_t number; int64_t number;
} SStreamStateCur; } SStreamStateCur;
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);

View File

@ -110,14 +110,14 @@ typedef struct {
int64_t ver; int64_t ver;
int32_t* dataRef; int32_t* dataRef;
SPackedData submit; SPackedData submit;
} SStreamDataSubmit2; } SStreamDataSubmit;
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver; int64_t ver;
SArray* dataRefs; // SArray<int32_t*> SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit> SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit2; } SStreamMergedSubmit;
typedef struct { typedef struct {
int8_t type; int8_t type;
@ -205,10 +205,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
void* streamQueueNextItem(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;

View File

@ -38,8 +38,8 @@ typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*); typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
TSKEY delMark); GetTsFun fp, void* pFile, TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState);
@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1; return -1;
} }
} }
@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1; return -1;
} }
} }

View File

@ -147,8 +147,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t vgId); int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta // tqMeta

View File

@ -1296,7 +1296,11 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
}
} }
return 0; return 0;

View File

@ -325,7 +325,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
memcpy(data, pBody, len); memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT); *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) { if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id); tqError("%s failed to create data submit for stream since out of memory", id);
@ -447,8 +447,8 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) { while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr); pReader->nextBlk, numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) { if (pReader->tbIdHash == NULL) {
@ -705,7 +705,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, const char* id) {
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTSchema, sourceIdx, &colVal); tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) { if (colVal.cid < pColData->info.colId) {
// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", // tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
// schema:%d",
// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); // sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
sourceIdx++; sourceIdx++;
continue; continue;

View File

@ -17,12 +17,12 @@
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
#define MAX_CATCH_NUM 10240 #define MAX_CACHE_TABLE_INFO_NUM 10240
typedef struct STblInfo { typedef struct STableSinkInfo {
uint64_t uid; uint64_t uid;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
} STblInfo; } STableSinkInfo;
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
@ -97,19 +97,21 @@ end:
return ret; return ret;
} }
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) { static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t)); void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
if (pVal) { if (pVal) {
*pTbl = *(STblInfo**)pVal; *pInfo = *(STableSinkInfo**)pVal;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) { int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_FAILED;
} }
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
} }
@ -274,7 +276,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
crTblArray = NULL; crTblArray = NULL;
} else { } else {
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end; goto _end;
@ -283,35 +285,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.suid = suid; tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version; tbData.sver = pTSchema->version;
STblInfo* pTblMeta = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta); STableSinkInfo* pTableSinkInfo = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
if (res != TSDB_CODE_SUCCESS) { if (res != TSDB_CODE_SUCCESS) {
pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo)); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo));
} }
char* ctbName = pDataBlock->info.parTbName; char* ctbName = pDataBlock->info.parTbName;
if (!ctbName[0]) { if (!ctbName[0]) {
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName)); memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
} else { } else {
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
memcpy(ctbName, tmp, strlen(tmp)); memcpy(ctbName, tmp, strlen(tmp));
memcpy(pTblMeta->tbName, tmp, strlen(tmp)); memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp); taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId); pDataBlock->info.id.groupId);
} }
} }
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
tbData.uid = pTblMeta->uid; tbData.uid = pTableSinkInfo->uid;
} else { } else {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) { if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(pTblMeta); taosMemoryFree(pTableSinkInfo);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
@ -371,7 +373,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type); mr.me.type);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(pTblMeta); taosMemoryFree(pTableSinkInfo);
continue; continue;
} }
@ -380,13 +382,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
", actual suid %" PRId64 "", ", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(pTblMeta); taosMemoryFree(pTableSinkInfo);
continue; continue;
} }
tbData.uid = mr.me.uid; tbData.uid = mr.me.uid;
pTblMeta->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid;
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta); int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
}
metaReaderClear(&mr); metaReaderClear(&mr);
} }
} }

View File

@ -18,7 +18,8 @@
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define NO_POLL_CNT 5 #define NO_POLL_CNT 5
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0}; char buf[128] = {0};
@ -103,7 +104,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal); tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
return 0; return 0;
} else { } else {
@ -152,7 +154,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return code; return code;
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey); pHandle->subKey, consumerId, vgId, pRequest->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1; return -1;
@ -196,11 +199,11 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// NOTE: this pHandle->consumerId may have been changed already. // NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end: end : {
{
char buf[80] = {0}; char buf[80] = {0};
tFormatOffset(buf, 80, &dataRsp.rspOffset); tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
} }
@ -208,7 +211,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
return code; return code;
} }
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0; int code = 0;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
@ -272,8 +276,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
SWalCont* pHead = &pCkHead->head; SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->epoch, vgId, fetchVer, pHead->msgType); pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
// process meta // process meta
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
@ -301,7 +305,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
if (code < 0) { if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
goto end; goto end;
} }
@ -363,7 +368,8 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int
pMsgHead->walever = ever; pMsgHead->walever = ever;
} }
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) { int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
int32_t vgId) {
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code); tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);

View File

@ -299,7 +299,6 @@ typedef struct SPartitionBySupporter {
typedef struct SPartitionDataInfo { typedef struct SPartitionDataInfo {
uint64_t groupId; uint64_t groupId;
char* tbname; char* tbname;
SArray* tags;
SArray* rowIds; SArray* rowIds;
} SPartitionDataInfo; } SPartitionDataInfo;

View File

@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
return pBlock; return pBlock;
} }
void freePartItem(void* ptr) {
SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
taosArrayDestroy(pPart->rowIds);
}
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
pInfo->tsColIndex = 0; pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);

View File

@ -2674,6 +2674,29 @@ TSKEY compareTs(void* pKey) {
return pWinKey->ts; return pWinKey->ts;
} }
int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) {
if (pCtx->subsidiaries.rowLen == 0) {
int32_t rowLen = 0;
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
rowLen += pc->pExpr->base.resSchema.bytes;
}
return rowLen + pCtx->subsidiaries.num * sizeof(bool);
} else {
return pCtx->subsidiaries.rowLen;
}
}
int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
int32_t size = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
size = TMAX(size, resSize);
}
return size;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -2720,8 +2743,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pInfo->pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -2730,10 +2756,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->numOfChild = numOfChild; pInfo->numOfChild = numOfChild;
@ -2766,7 +2788,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
@ -4889,9 +4912,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pInfo->pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -4912,10 +4939,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
pInfo->pPhyNode = NULL; // create new child pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL; pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo pInfo->pPullWins = NULL; // SPullWindowInfo
@ -4928,7 +4951,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,

View File

@ -883,10 +883,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
} }
pStart += pDstCol->info.bytes; pStart += pDstCol->info.bytes;
} }
if (pCtx->saveHandle.pState) {
streamFreeVal((void*)p);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3123,7 +3119,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf; return buf;
} }
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key,
STuplePos* pPos) { STuplePos* pPos) {
STuplePos p = {0}; STuplePos p = {0};
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
@ -3171,7 +3167,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx); prepareBuf(pCtx);
STupleKey key; SWinKey key;
if (pCtx->saveHandle.pBuf == NULL) { if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
@ -3179,7 +3175,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
key.groupId = pSrcBlock->info.id.groupId; key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey; key.ts = skey;
key.exprIdx = pCtx->exprIdx;
} }
} }

View File

@ -96,8 +96,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
} }
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
@ -294,23 +294,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, px->submit.msgLen, px->submit.ver, numOfBlocks, size);
total, size);
streamDataSubmitDestroy(pSubmitBlock); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
taosFreeQitem(pSubmitBlock); qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1; return -1;
} }
taosWriteQitem(pTask->inputQueue->queue, pItem);
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
@ -331,10 +334,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
} }
#if 0
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0; return 0;
} }
@ -356,6 +355,4 @@ void* streamQueueNextItem(SStreamQueue* queue) {
} }
} }
void streamTaskInputFail(SStreamTask* pTask) { void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

View File

@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0; return 0;
} }
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
if (pDataSubmit == NULL) { if (pDataSubmit == NULL) {
return NULL; return NULL;
} }
@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
return NULL; return NULL;
} }
pDataSubmit->submit = submit; pDataSubmit->submit = *pData;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1 *pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = type; pDataSubmit->type = type;
return pDataSubmit; return pDataSubmit;
} }
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
} }
} }
SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
if (pMerged == NULL) { if (pMerged == NULL) {
return NULL; return NULL;
} }
@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() {
return pMerged; return pMerged;
} }
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->submits, &pSubmit->submit); taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver; pMerged->ver = pSubmit->ver;
return 0; return 0;
} }
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
int32_t len = 0; int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen; len = pSubmit->submit.msgLen;
} }
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }
streamDataSubmitRefInc(pSubmit); streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone; return pSubmitClone;
} }
@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(pElem); taosFreeQitem(pElem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc); streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(pElem); taosFreeQitem(pElem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
// todo handle error // todo handle error
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst); taosFreeQitem(dst);
taosFreeQitem(pElem); taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged; return (SStreamQueueItem*)pMerged;
@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitDestroy((SStreamDataSubmit2*)data); streamDataSubmitDestroy((SStreamDataSubmit*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->submits); int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);

View File

@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* pBlockList = pMerged->submits; SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
taosArrayDestroy(pRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0; return 0;
} }
@ -269,6 +269,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) {
return 0;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
@ -367,11 +370,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->blocks = pRes; qRes->blocks = pRes;
if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver; qRes->sourceVer = pMerged->ver;
} }
@ -408,7 +411,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr); qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
(!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} }

View File

@ -106,3 +106,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
return (SStreamQueueRes){0}; return (SStreamQueueRes){0};
} }
#endif #endif
#define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16
// todo refactor:
// read data from input queue
typedef struct SQueueReader {
SStreamQueue* pQueue;
int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
int32_t numOfBlocks = 0;
int32_t tryCount = 0;
SStreamQueueItem* pRet = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue);
if (qItem == NULL) {
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++;
taosMsleep(1);
qDebug("===stream===try again batchSize:%d", numOfBlocks);
continue;
}
qDebug("===stream===break batchSize:%d", numOfBlocks);
break;
}
if (pRet == NULL) {
pRet = qItem;
streamQueueProcessSuccess(pReader->pQueue);
if (pReader->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) {
streamQueueProcessFail(pReader->pQueue);
break;
} else {
numOfBlocks++;
pRet = newRet;
streamQueueProcessSuccess(pReader->pQueue);
if (numOfBlocks > pReader->maxBlocks) {
qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr);
break;
}
}
}
}
return pRet;
}

View File

@ -272,26 +272,30 @@ int32_t streamStateCommit(SStreamState* pState) {
#endif #endif
} }
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStateFuncPut_rocksdb(pState, key, value, vLen); void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
#else #else
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
#endif #endif
} }
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen); void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
#else #else
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
#endif
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
#ifdef USE_ROCKSDB
return streamStateFuncDel_rocksdb(pState, key);
#else
return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
#endif #endif
} }

View File

@ -32,6 +32,7 @@ struct SStreamFileState {
SSHashObj* rowBuffMap; SSHashObj* rowBuffMap;
void* pFileStore; void* pFileStore;
int32_t rowSize; int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen; int32_t keyLen;
uint64_t preCheckPointVersion; uint64_t preCheckPointVersion;
uint64_t checkPointVersion; uint64_t checkPointVersion;
@ -45,8 +46,8 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo; typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
TSKEY delMark) { GetTsFun fp, void* pFile, TSKEY delMark) {
if (memSize <= 0) { if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
} }
@ -58,6 +59,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) { if (!pFileState) {
goto _error; goto _error;
} }
rowSize += selectRowSize;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES);
@ -69,11 +71,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
} }
pFileState->keyLen = keySize; pFileState->keyLen = keySize;
pFileState->rowSize = rowSize; pFileState->rowSize = rowSize;
pFileState->selectivityRowSize = selectRowSize;
pFileState->preCheckPointVersion = 0; pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1; pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile; pFileState->pFileStore = pFile;
pFileState->getTs = fp; pFileState->getTs = fp;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0; pFileState->curRowCount = 0;
pFileState->deleteMark = delMark; pFileState->deleteMark = delMark;
pFileState->flushMark = INT64_MIN; pFileState->flushMark = INT64_MIN;
@ -441,9 +443,12 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark; : pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark); deleteExpiredCheckPoint(pFileState, mark);
}
void* pStVal = NULL; void* pStVal = NULL;
int32_t len = 0; int32_t len = 0;
@ -479,3 +484,5 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }

View File

@ -307,4 +307,63 @@ sql resume stream IF EXISTS streams66666666;
print ===== step 4 over print ===== step 4 over
print ===== step5
sql drop stream if exists streams6;
sql drop database if exists test6;
sql create database test6 vgroups 10;
sql use test6;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s);
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sleep 1000
sql pause stream streams6;
sleep 1000
sql insert into ts1 values(1648791223001,1,12,3,1.0);
sql insert into ts2 values(1648791233001,1,12,3,1.0);
sql resume stream streams6;
sql insert into ts3 values(1648791243001,1,12,3,1.0);
sql insert into ts4 values(1648791253001,1,12,3,1.0);
$loop_count = 0
loop6:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 2 select * from streamt6;
sql select * from streamt6;
if $rows != 5 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
print $data50 $data51 $data52
goto loop6
endi
print ===== step5 over
system sh/stop_dnodes.sh system sh/stop_dnodes.sh