Merge branch 'main' into feat/TD-26127-audit-sql

This commit is contained in:
dm chen 2023-09-20 11:13:34 +08:00 committed by GitHub
commit 4497a01168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 207 additions and 97 deletions

View File

@ -180,6 +180,7 @@ The following list shows all reserved keywords:
- MAX_DELAY
- BWLIMIT
- MAXROWS
- MAX_SPEED
- MERGE
- META
- MINROWS

View File

@ -180,6 +180,7 @@ description: TDengine 保留关键字的详细列表
- MAX_DELAY
- BWLIMIT
- MAXROWS
- MAX_SPEED
- MERGE
- META
- MINROWS

View File

@ -247,6 +247,7 @@ typedef struct SStreamTaskId {
} SStreamTaskId;
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
@ -661,6 +662,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);

View File

@ -2353,27 +2353,26 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
int32_t maxRows = 0;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL) {
continue;
}
if (!pBoolList) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL) {
continue;
}
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
pDst->varmeta.length = 0;
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
pDst->varmeta.length = 0;
}
}
}
if (NULL == pBoolList) {
return;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL) {
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
}

View File

@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 300;
int32_t tsStreamCheckpointTickInterval = 10;
int32_t tsStreamNodeCheckInterval = 10;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;

View File

@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -122,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) {
taosThreadMutexInit(&execNodeList.lock, NULL);
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry));
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId));
return sdbSetTable(pMnode->pSdb, table);
}
@ -1173,10 +1176,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execNodeList.lock);
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) {
STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status));
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
break;
}
@ -1278,7 +1286,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
tFreeSMDropStreamReq(&dropReq);
return -1;
}
// mndTransSetSerial(pTrans);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
@ -1305,6 +1312,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
removeStreamTasksInBuf(pStream, &execNodeList);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
@ -1554,13 +1563,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (index == NULL) {
STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
const char* pStatus = streamGetTaskStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2237,7 +2245,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2251,15 +2259,42 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
if (p == NULL) {
STaskStatusEntry entry = {
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
}
}
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
break;
}
}
}
}
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -2276,7 +2311,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
// int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);
@ -2287,13 +2321,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (index == NULL) {
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId);
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
pStatusEntry->status = p->status;
pEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
}

View File

@ -886,6 +886,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTaskCheckReq req;
SDecoder decoder;
@ -906,10 +907,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
// only the leader node handle the check request
if (!pMeta->leader) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
return -1;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
@ -921,7 +929,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {

View File

@ -16,6 +16,8 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
@ -95,7 +97,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
}
pTask->taskExecInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
@ -111,12 +113,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
// taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}
@ -124,7 +123,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
// taosWUnLockLatch(&pMeta->lock);
return -1;
}
@ -135,8 +133,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}
@ -159,6 +155,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
pMeta->walScanCounter = MAX_REPEAT_SCAN_THRESHOLD;
}
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);

View File

@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already.
@ -201,6 +202,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
pTask->chkInfo.startTs = taosGetTimestampMs();
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
@ -312,15 +315,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId,
el, pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
pTask->id.idStr, remain, pMeta->numOfStreamTasks);
qDebug(
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not "
"ready:%d/%d",
pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks);
}
// send check point response to upstream task

View File

@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem == NULL) {
return pStreamBlocks;
}
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->sourceVer = pSubmit->ver;

View File

@ -16,9 +16,10 @@
#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks);
return code;
}
@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
if ((++numOfBlocks) >= outputBatchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks,
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
break;
}
}
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pStreamBlocks);
return code;
}
size = 0;
} else {
taosArrayDestroy(pRes);
}
@ -525,6 +521,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
while (1) {
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
@ -533,9 +532,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks);
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);

View File

@ -294,6 +294,8 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
}
void streamMetaClose(SStreamMeta* pMeta) {
@ -644,10 +646,12 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
int32_t vgId = pMeta->vgId;
qInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
qError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
return -1;
}
@ -662,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
doClear(pKey, pVal, pCur, pRecycleList);
return -1;
}
@ -672,9 +678,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually", vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);
@ -731,6 +736,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
qError("vgId:%d failed to close meta-file cursor", vgId);
taosArrayDestroy(pRecycleList);
return -1;
}
@ -743,6 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);

View File

@ -387,7 +387,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total + 1, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
qDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}
return TSDB_CODE_SUCCESS;

View File

@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
const char* id = pTask->id.idStr;
if (stage == -1) {
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id,
upstreamTaskId, stage);
}
if (pInfo->stage < stage) {
qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage);
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0;
@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
}
int32_t streamSetStatusUnint(SStreamTask* pTask) {
int32_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr);
return -1;
} else {
qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT);
return 0;
}
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);

View File

@ -257,7 +257,7 @@ class TDSql:
return self.cursor.istype(col, dataType)
def checkData(self, row, col, data):
def checkData(self, row, col, data, show = False):
if row >= self.queryRows:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row+1, self.queryRows)
@ -275,8 +275,8 @@ class TDSql:
if isinstance(data,str) :
if (len(data) >= 28):
if self.queryResult[row][col] == _parse_ns_timestamp(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{pd.to_datetime(resultData)} == expect:{data}")
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
@ -284,7 +284,8 @@ class TDSql:
else:
if self.queryResult[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
@ -317,7 +318,8 @@ class TDSql:
if data == self.queryResult[row][col]:
success = True
if success:
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
@ -328,7 +330,8 @@ class TDSql:
delt_data = data-datetime.datetime.fromtimestamp(0,data.tzinfo)
delt_result = self.queryResult[row][col] - datetime.datetime.fromtimestamp(0,self.queryResult[row][col].tzinfo)
if delt_data == delt_result:
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
@ -341,16 +344,19 @@ class TDSql:
if str(self.queryResult[row][col]) == str(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
return
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
@ -361,7 +367,8 @@ class TDSql:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
tdLog.info("check successfully")
if(show):
tdLog.info("check successfully")
# return true or false replace exit, no print out
def checkRowColNoExit(self, row, col):

View File

@ -109,11 +109,14 @@ class VNode :
# load config
tdLog.info(f' meta-ver file={metaFile}')
if metaFile != "":
jsonVer = jsonFromFile(metaFile)
metaNode = jsonVer["meta"]
self.snapVer = int(metaNode["snapshotVer"])
self.firstVer = int(metaNode["firstVer"])
self.lastVer = int(metaNode["lastVer"])
try:
jsonVer = jsonFromFile(metaFile)
metaNode = jsonVer["meta"]
self.snapVer = int(metaNode["snapshotVer"])
self.firstVer = int(metaNode["firstVer"])
self.lastVer = int(metaNode["lastVer"])
except Exception as e:
tdLog.info(f' read json file except.')
# sort with startVer
self.walFiles = sorted(self.walFiles, key=lambda x : x.startVer, reverse=True)

View File

@ -19,10 +19,39 @@ class TDTestCase:
def check_result(self):
for i in range(self.rowNum):
tdSql.checkData(i, 0, 1);
def full_datatype_test(self):
tdSql.execute("use db;")
sql = "create table db.st(ts timestamp, c1 bool, c2 float, c3 double,c4 tinyint, c5 smallint, c6 int, c7 bigint, c8 tinyint unsigned, c9 smallint unsigned, c10 int unsigned, c11 bigint unsigned) tags( area int);"
tdSql.execute(sql)
sql = "create table db.t1 using db.st tags(1);"
tdSql.execute(sql)
ts = 1694000000000
rows = 126
for i in range(rows):
ts += 1
sql = f"insert into db.t1 values({ts},true,{i},{i},{i%127},{i%32767},{i},{i},{i%127},{i%32767},{i},{i});"
tdSql.execute(sql)
sql = "select diff(ts),diff(c1),diff(c3),diff(c4),diff(c5),diff(c6),diff(c7),diff(c8),diff(c9),diff(c10),diff(c11) from db.t1"
tdSql.query(sql)
tdSql.checkRows(rows - 1)
for i in range(rows - 1):
for j in range(10):
if j == 1: # bool
tdSql.checkData(i, j, 0)
else:
tdSql.checkData(i, j, 1)
def run(self):
tdSql.prepare()
dbname = "db"
# full type test
self.full_datatype_test()
tdSql.execute(
f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)")
tdSql.execute(