Merge branch 'main' into fix/liaohj_main
This commit is contained in:
commit
1fa4327f6c
|
@ -340,6 +340,7 @@ typedef struct SStreamMeta {
|
|||
TTB* pTaskDb;
|
||||
TTB* pCheckpointDb;
|
||||
SHashObj* pTasks;
|
||||
SArray* pTaskList; // SArray<task_id*>
|
||||
void* ahandle;
|
||||
TXN* txn;
|
||||
FTaskExpand* expandFunc;
|
||||
|
|
|
@ -558,10 +558,15 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
|
|||
return 0;
|
||||
}
|
||||
|
||||
#define BOUNDARY 1024
|
||||
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
|
||||
int32_t result = 1;
|
||||
while (result <= length) {
|
||||
result *= 2;
|
||||
if (length >= BOUNDARY){
|
||||
result = length;
|
||||
}else{
|
||||
while (result <= length) {
|
||||
result *= 2;
|
||||
}
|
||||
}
|
||||
if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
|
||||
result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
|
||||
|
@ -657,7 +662,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
|
|||
len += field->bytes;
|
||||
}
|
||||
if(len > maxLen){
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -2524,6 +2524,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
|
|||
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
|
||||
metaRsp.numOfColumns = -1;
|
||||
metaRsp.suid = pStbVersion->suid;
|
||||
tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
|
||||
tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
|
||||
tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
|
||||
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -153,11 +153,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
|||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
||||
|
||||
// 2.save task
|
||||
taosWLockLatch(&pSnode->pMeta->lock);
|
||||
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
|
||||
if (code < 0) {
|
||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
||||
|
||||
// 3.go through recover steps to fill history
|
||||
if (pTask->fillHistory) {
|
||||
streamSetParamForRecover(pTask);
|
||||
|
|
|
@ -781,13 +781,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
// 2.save task, use the newest commit version as the initial start version of stream task.
|
||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
|
||||
if (code < 0) {
|
||||
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
|
||||
streamMetaGetNumOfTasks(pTq->pStreamMeta));
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
// 3.go through recover steps to fill history
|
||||
if (pTask->fillHistory) {
|
||||
streamTaskCheckDownstream(pTask, sversion);
|
||||
|
@ -1268,7 +1272,8 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
|||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqInfo("vgId:%d no stream tasks exists", vgId);
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
|
|
@ -22,14 +22,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
|||
tqProcessSubmitReqForSubscribe(pTq);
|
||||
}
|
||||
|
||||
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
||||
|
||||
// push data for stream processing:
|
||||
// 1. the vnode has already been restored.
|
||||
// 2. the vnode should be the leader.
|
||||
// 3. the stream is not suspended yet.
|
||||
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
|
||||
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
|
||||
if (numOfTasks == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1093,6 +1093,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
|||
|
||||
if (shouldIdle) {
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
pMeta->walScanCounter -= 1;
|
||||
times = pMeta->walScanCounter;
|
||||
|
||||
|
@ -56,42 +57,28 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
|
||||
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||
void* pIter = NULL;
|
||||
|
||||
taosWLockLatch(&pStreamMeta->lock);
|
||||
while(1) {
|
||||
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
taosArrayPush(pTaskIdList, &pTask->id.taskId);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
return pTaskIdList;
|
||||
}
|
||||
|
||||
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||
*pScanIdle = true;
|
||||
bool noNewDataInWal = true;
|
||||
int32_t vgId = pStreamMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
|
||||
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pTaskList = NULL;
|
||||
taosWLockLatch(&pStreamMeta->lock);
|
||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
|
||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
|
||||
|
||||
// update the new task number
|
||||
numOfTasks = taosArrayGetSize(pTaskIdList);
|
||||
numOfTasks = taosArrayGetSize(pTaskList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
|
||||
int32_t* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
|
||||
if (pTask == NULL) {
|
||||
continue;
|
||||
|
@ -165,7 +152,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
*pScanIdle = true;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskIdList);
|
||||
taosArrayDestroy(pTaskList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,10 +21,9 @@
|
|||
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
|
||||
|
||||
typedef enum {
|
||||
READER_STATUS_SUSPEND = 0x1,
|
||||
READER_STATUS_SHOULD_STOP = 0x2,
|
||||
READER_STATUS_NORMAL = 0x3,
|
||||
} EReaderExecStatus;
|
||||
READER_STATUS_SUSPEND = 0x1,
|
||||
READER_STATUS_NORMAL = 0x2,
|
||||
} EReaderStatus;
|
||||
|
||||
typedef enum {
|
||||
EXTERNAL_ROWS_PREV = 0x1,
|
||||
|
@ -184,6 +183,7 @@ typedef struct STsdbReaderAttr {
|
|||
STimeWindow window;
|
||||
bool freeBlock;
|
||||
SVersionRange verRange;
|
||||
int16_t order;
|
||||
} STsdbReaderAttr;
|
||||
|
||||
typedef struct SResultBlockInfo {
|
||||
|
@ -196,7 +196,8 @@ struct STsdbReader {
|
|||
STsdb* pTsdb;
|
||||
SVersionRange verRange;
|
||||
TdThreadMutex readerMutex;
|
||||
EReaderExecStatus flag;
|
||||
EReaderStatus flag;
|
||||
int32_t code;
|
||||
uint64_t suid;
|
||||
int16_t order;
|
||||
EReadMode readMode;
|
||||
|
@ -2995,9 +2996,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
|
||||
while (1) {
|
||||
// only check here, since the iterate data in memory is very fast.
|
||||
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
bool hasNext = false;
|
||||
|
@ -3093,9 +3094,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
while (1) {
|
||||
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
// load the last data block of current table
|
||||
|
@ -3246,7 +3247,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
|
||||
}
|
||||
|
||||
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
||||
|
@ -3395,9 +3396,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
|||
STableUidList* pUidList = &pStatus->uidList;
|
||||
|
||||
while (1) {
|
||||
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
|
||||
|
@ -3493,7 +3494,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
|
|||
terrno = 0;
|
||||
|
||||
code = doLoadLastBlockSequentially(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return TSDB_READ_RETURN;
|
||||
}
|
||||
|
@ -3507,8 +3508,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
|
|||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
||||
// error happens or all the data files are completely checked
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) ||
|
||||
pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
terrno = code;
|
||||
return TSDB_READ_RETURN;
|
||||
}
|
||||
|
@ -3536,13 +3536,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
code = doBuildDataBlock(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pResBlock->info.rows > 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -3581,13 +3577,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
code = doBuildDataBlock(pReader);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||
if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pResBlock->info.rows > 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4849,8 +4841,8 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
|
|||
|
||||
*hasNext = false;
|
||||
|
||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
|
||||
return code;
|
||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) {
|
||||
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
|
||||
}
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
@ -5456,4 +5448,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
|
|||
pReader->idStr = taosStrdup(idstr);
|
||||
}
|
||||
|
||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; }
|
||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
|
||||
|
|
|
@ -6666,22 +6666,40 @@ static int32_t createRealTableForGrantTable(SGrantStmt* pStmt, SRealTableNode**
|
|||
}
|
||||
|
||||
static int32_t translateGrantTagCond(STranslateContext* pCxt, SGrantStmt* pStmt, SAlterUserReq* pReq) {
|
||||
if (NULL == pStmt->pTagCond) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SRealTableNode* pTable = NULL;
|
||||
if ('\0' == pStmt->tabName[0] || '*' == pStmt->tabName[0]) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"The With clause can only be used for table level privilege");
|
||||
if (pStmt->pTagCond) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"The With clause can only be used for table level privilege");
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
pCxt->pCurrStmt = (SNode*)pStmt;
|
||||
SRealTableNode* pTable = NULL;
|
||||
int32_t code = createRealTableForGrantTable(pStmt, &pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SName name;
|
||||
code = getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pTable->table.dbName, pTable->table.tableName, &name),
|
||||
&(pTable->pMeta));
|
||||
if (code) {
|
||||
nodesDestroyNode((SNode*)pTable);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE != pTable->pMeta->tableType && TSDB_NORMAL_TABLE != pTable->pMeta->tableType) {
|
||||
nodesDestroyNode((SNode*)pTable);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"Only supertable and normal table can be granted");
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTagCond) {
|
||||
nodesDestroyNode((SNode*)pTable);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pCxt->pCurrStmt = (SNode*)pStmt;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNamespace(pCxt, pTable);
|
||||
}
|
||||
|
|
|
@ -314,6 +314,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
|
||||
|
||||
taosWLockLatch(&pTask->pMeta->lock);
|
||||
|
||||
streamMetaSaveTask(pTask->pMeta, pTask);
|
||||
if (streamMetaCommit(pTask->pMeta) < 0) {
|
||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||
|
|
|
@ -57,6 +57,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// task list
|
||||
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pMeta->pTaskList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamMetaBegin(pMeta) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -70,6 +77,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
_err:
|
||||
taosMemoryFree(pMeta->path);
|
||||
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
||||
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
||||
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
||||
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
||||
if (pMeta->db) tdbClose(pMeta->db);
|
||||
|
@ -100,6 +108,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
|
||||
taosMemoryFree(pMeta->path);
|
||||
taosMemoryFree(pMeta);
|
||||
}
|
||||
|
@ -180,11 +189,15 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
|||
}
|
||||
|
||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
|
||||
return (int32_t) taosHashGetSize(pMeta->pTasks);
|
||||
size_t size = taosHashGetSize(pMeta->pTasks);
|
||||
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
|
||||
|
||||
return (int32_t) size;
|
||||
}
|
||||
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
|
@ -216,12 +229,23 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
|
||||
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||
if (*pTaskId == taskId) {
|
||||
taosArrayRemove(pMeta->pTaskList, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
}
|
||||
|
@ -306,6 +330,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
|
||||
if (pTask->fillHistory) {
|
||||
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
||||
streamTaskCheckDownstream(pTask, ver);
|
||||
|
|
|
@ -1814,6 +1814,11 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
|
||||
*ppVal = pVal;
|
||||
*vLen = cd.vLen;
|
||||
} else {
|
||||
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
|
||||
tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal);
|
||||
tdbFree(cd.pVal);
|
||||
}
|
||||
}
|
||||
|
||||
ret = tdbBtcMoveToNext(pBtc);
|
||||
|
|
|
@ -171,7 +171,7 @@ void taosGetSystemLocale(char *outLocale, char *outCharset) {
|
|||
strcpy(outLocale, "en_US.UTF-8");
|
||||
} else {
|
||||
tstrncpy(outLocale, locale, TD_LOCALE_LEN);
|
||||
printf("locale not configured, set to system default:%s\n", outLocale);
|
||||
//printf("locale not configured, set to system default:%s\n", outLocale);
|
||||
}
|
||||
|
||||
// if user does not specify the charset, extract it from locale
|
||||
|
|
|
@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
|
|||
#define taospy 2.7.6
|
||||
pip3 list|grep taospy
|
||||
pip3 uninstall taospy -y
|
||||
pip3 install taospy==2.7.6
|
||||
pip3 install --default-timeout=120 taospy==2.7.6
|
||||
|
||||
$TIMEOUT_CMD $cmd
|
||||
RET=$?
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
import os
|
||||
import socket
|
||||
import requests
|
||||
|
@ -238,17 +239,7 @@ def start_taosd():
|
|||
start_cmd = 'cd %s && python3 test.py >>/dev/null '%(start_path)
|
||||
os.system(start_cmd)
|
||||
|
||||
def get_cmds(args_list):
|
||||
# build_path = get_path()
|
||||
# if repo == "community":
|
||||
# crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
# elif repo == "TDengine":
|
||||
# crash_gen_path = build_path[:-5]+"/tests/pytest/"
|
||||
# else:
|
||||
# pass
|
||||
|
||||
# crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path)
|
||||
|
||||
def get_cmds(args_list):
|
||||
crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode)
|
||||
return crash_gen_cmd
|
||||
|
||||
|
@ -295,7 +286,7 @@ def check_status():
|
|||
elif "Crash_Gen is now exiting with status code: 0" in run_code:
|
||||
return 0
|
||||
else:
|
||||
return 2
|
||||
return 2
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -310,7 +301,7 @@ def main():
|
|||
|
||||
|
||||
build_path = get_path()
|
||||
os.system("pip install git+https://github.com/taosdata/taos-connector-python.git")
|
||||
|
||||
if repo =="community":
|
||||
crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
elif repo =="TDengine":
|
||||
|
@ -334,7 +325,9 @@ def main():
|
|||
if not os.path.exists(run_dir):
|
||||
os.mkdir(run_dir)
|
||||
print(crash_cmds)
|
||||
starttime = datetime.datetime.now()
|
||||
run_crash_gen(crash_cmds)
|
||||
endtime = datetime.datetime.now()
|
||||
status = check_status()
|
||||
|
||||
print("exit status : ", status)
|
||||
|
@ -349,7 +342,12 @@ def main():
|
|||
print('======== crash_gen run sucess and exit as expected ========')
|
||||
|
||||
try:
|
||||
text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}"
|
||||
text = f'''exit status: {msg_dict[status]}
|
||||
git commit : {git_commit}
|
||||
hostname: {hostname}
|
||||
start time: {starttime}
|
||||
end time: {endtime}
|
||||
cmd: {crash_cmds}'''
|
||||
send_msg(get_msg(text))
|
||||
except Exception as e:
|
||||
print("exception:", e)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import socket
|
||||
import requests
|
||||
|
@ -241,15 +242,6 @@ def start_taosd():
|
|||
os.system(start_cmd +">>/dev/null")
|
||||
|
||||
def get_cmds(args_list):
|
||||
# build_path = get_path()
|
||||
# if repo == "community":
|
||||
# crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
# elif repo == "TDengine":
|
||||
# crash_gen_path = build_path[:-5]+"/tests/pytest/"
|
||||
# else:
|
||||
# pass
|
||||
|
||||
# crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path)
|
||||
|
||||
crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode)
|
||||
return crash_gen_cmd
|
||||
|
@ -343,7 +335,6 @@ def main():
|
|||
args = limits(args)
|
||||
|
||||
build_path = get_path()
|
||||
os.system("pip install git+https://github.com/taosdata/taos-connector-python.git >>/dev/null")
|
||||
if repo =="community":
|
||||
crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
elif repo =="TDengine":
|
||||
|
@ -368,7 +359,9 @@ def main():
|
|||
if not os.path.exists(run_dir):
|
||||
os.mkdir(run_dir)
|
||||
print(crash_cmds)
|
||||
starttime = datetime.datetime.now()
|
||||
run_crash_gen(crash_cmds)
|
||||
endtime = datetime.datetime.now()
|
||||
status = check_status()
|
||||
# back_path = os.path.join(core_path,"valgrind_report")
|
||||
|
||||
|
@ -384,7 +377,12 @@ def main():
|
|||
print('======== crash_gen run sucess and exit as expected ========')
|
||||
|
||||
try:
|
||||
text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}"
|
||||
text = f'''exit status: {msg_dict[status]}
|
||||
git commit : {git_commit}
|
||||
hostname: {hostname}
|
||||
start time: {starttime}
|
||||
end time: {endtime}
|
||||
cmd: {crash_cmds}'''
|
||||
send_msg(get_msg(text))
|
||||
except Exception as e:
|
||||
print("exception:", e)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import socket
|
||||
import requests
|
||||
|
@ -241,16 +242,7 @@ def start_taosd():
|
|||
os.system(start_cmd +">>/dev/null")
|
||||
|
||||
def get_cmds(args_list):
|
||||
# build_path = get_path()
|
||||
# if repo == "community":
|
||||
# crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
# elif repo == "TDengine":
|
||||
# crash_gen_path = build_path[:-5]+"/tests/pytest/"
|
||||
# else:
|
||||
# pass
|
||||
|
||||
# crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path)
|
||||
|
||||
|
||||
crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode)
|
||||
return crash_gen_cmd
|
||||
|
||||
|
@ -342,8 +334,7 @@ def main():
|
|||
args = random_args(args_list)
|
||||
args = limits(args)
|
||||
|
||||
build_path = get_path()
|
||||
os.system("pip install git+https://github.com/taosdata/taos-connector-python.git >>/dev/null")
|
||||
build_path = get_path()
|
||||
if repo =="community":
|
||||
crash_gen_path = build_path[:-5]+"community/tests/pytest/"
|
||||
elif repo =="TDengine":
|
||||
|
@ -368,7 +359,9 @@ def main():
|
|||
if not os.path.exists(run_dir):
|
||||
os.mkdir(run_dir)
|
||||
print(crash_cmds)
|
||||
starttime = datetime.datetime.now()
|
||||
run_crash_gen(crash_cmds)
|
||||
endtime = datetime.datetime.now()
|
||||
status = check_status()
|
||||
# back_path = os.path.join(core_path,"valgrind_report")
|
||||
|
||||
|
@ -384,7 +377,12 @@ def main():
|
|||
print('======== crash_gen run sucess and exit as expected ========')
|
||||
|
||||
try:
|
||||
text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}"
|
||||
text = f'''exit status: {msg_dict[status]}
|
||||
git commit : {git_commit}
|
||||
hostname: {hostname}
|
||||
start time: {starttime}
|
||||
end time: {endtime}
|
||||
cmd: {crash_cmds}'''
|
||||
send_msg(get_msg(text))
|
||||
except Exception as e:
|
||||
print("exception:", e)
|
||||
|
|
Loading…
Reference in New Issue