Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
a4a226a22d
|
@ -444,6 +444,7 @@ typedef struct STaskCheckInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int32_t notReadyTasks;
|
int32_t notReadyTasks;
|
||||||
int32_t inCheckProcess;
|
int32_t inCheckProcess;
|
||||||
|
int32_t stopCheckProcess;
|
||||||
tmr_h checkRspTmr;
|
tmr_h checkRspTmr;
|
||||||
TdThreadMutex checkInfoLock;
|
TdThreadMutex checkInfoLock;
|
||||||
} STaskCheckInfo;
|
} STaskCheckInfo;
|
||||||
|
@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
||||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||||
int32_t* pNotReady, const char* id);
|
int32_t* pNotReady, const char* id);
|
||||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
|
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
|
||||||
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
|
||||||
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id);
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
|
|
@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
|
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
|
||||||
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
streamTaskResetStatus(*ppHTask);
|
streamTaskResetStatus(*ppHTask);
|
||||||
streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
|
streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -184,15 +184,10 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs());
|
|
||||||
|
|
||||||
// serialize streamProcessScanHistoryFinishRsp
|
// serialize streamProcessScanHistoryFinishRsp
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
|
@ -206,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||||
|
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
@ -226,11 +222,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
|
||||||
} else { // for sink task, set it ready directly.
|
} else { // for sink task, set it ready directly.
|
||||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||||
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
doProcessDownstreamReadyRsp(pTask);
|
doProcessDownstreamReadyRsp(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,7 +399,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
||||||
streamTaskCompleteCheck(pInfo, id);
|
streamTaskStopMonitorCheckRsp(pInfo, id);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
|
|
|
@ -534,7 +534,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
|
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
TdThreadMutexAttr attr = {0};
|
TdThreadMutexAttr attr = {0};
|
||||||
int code = taosThreadMutexAttrInit(&attr);
|
|
||||||
|
int code = taosThreadMutexAttrInit(&attr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
||||||
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||||
if (pOutputInfo->pDownstreamUpdateList == NULL) {
|
if (pOutputInfo->pDownstreamUpdateList == NULL) {
|
||||||
|
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
||||||
|
if (pTask->taskCheckInfo.pList == NULL) {
|
||||||
|
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr,
|
||||||
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -942,14 +951,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
||||||
if (pInfo->pList == NULL) {
|
taosArrayClear(pInfo->pList);
|
||||||
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
|
||||||
} else {
|
|
||||||
taosArrayClear(pInfo->pList);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
pInfo->notReadyTasks = 1;
|
pInfo->notReadyTasks = 1;
|
||||||
|
@ -959,8 +962,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->startTs = startTs;
|
pInfo->startTs = startTs;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1014,39 +1015,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
if (pInfo->inCheckProcess == 0) {
|
if (pInfo->inCheckProcess == 0) {
|
||||||
pInfo->inCheckProcess = 1;
|
pInfo->inCheckProcess = 1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pInfo->startTs > 0);
|
ASSERT(pInfo->startTs > 0);
|
||||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs);
|
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
stDebug("s-task:%s set the in-check-procedure flag", id);
|
stDebug("s-task:%s set the in-check-procedure flag", id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
|
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
if (!pInfo->inCheckProcess) {
|
if (!pInfo->inCheckProcess) {
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
stWarn("s-task:%s already not in-check-procedure", id);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - pInfo->startTs;
|
int64_t el = taosGetTimestampMs() - pInfo->startTs;
|
||||||
stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el);
|
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
|
||||||
|
|
||||||
pInfo->startTs = 0;
|
pInfo->startTs = 0;
|
||||||
pInfo->inCheckProcess = 0;
|
|
||||||
pInfo->notReadyTasks = 0;
|
pInfo->notReadyTasks = 0;
|
||||||
|
pInfo->inCheckProcess = 0;
|
||||||
|
pInfo->stopCheckProcess = 0;
|
||||||
taosArrayClear(pInfo->pList);
|
taosArrayClear(pInfo->pList);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1099,16 +1094,22 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int64_t el = now - pInfo->startTs;
|
int64_t el = now - pInfo->startTs;
|
||||||
ETaskStatus state = pStat->state;
|
ETaskStatus state = pStat->state;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfReady = 0;
|
int32_t numOfReady = 0;
|
||||||
int32_t numOfFault = 0;
|
int32_t numOfFault = 0;
|
||||||
const char* id = pTask->id.idStr;
|
int32_t numOfNotRsp = 0;
|
||||||
|
int32_t numOfNotReady = 0;
|
||||||
|
int32_t numOfTimeout = 0;
|
||||||
|
|
||||||
stDebug("s-task:%s start to do check downstream rsp check", id);
|
stDebug("s-task:%s start to do check downstream rsp check", id);
|
||||||
|
|
||||||
if (state == TASK_STATUS__STOP) {
|
if (state == TASK_STATUS__STOP) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
streamTaskCompleteCheck(pInfo, id);
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
return;
|
return;
|
||||||
|
@ -1117,7 +1118,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
streamTaskCompleteCheck(pInfo, id);
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1127,8 +1132,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
|
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
|
||||||
vgId, ref);
|
vgId, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
streamTaskCompleteCheck(pInfo, id);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1141,7 +1146,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
numOfReady += 1;
|
numOfReady += 1;
|
||||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId);
|
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
||||||
|
p->taskId);
|
||||||
numOfFault += 1;
|
numOfFault += 1;
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||||
if (p->rspTs == 0) { // not response yet
|
if (p->rspTs == 0) { // not response yet
|
||||||
|
@ -1149,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||||
taosArrayPush(pTimeoutList, &p->taskId);
|
taosArrayPush(pTimeoutList, &p->taskId);
|
||||||
} else { // el < CHECK_NOT_RSP_DURATION
|
} else { // el < CHECK_NOT_RSP_DURATION
|
||||||
// do nothing and continue waiting for their rsps
|
numOfNotRsp += 1; // do nothing and continue waiting for their rsp
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(pNotReadyList, &p->taskId);
|
taosArrayPush(pNotReadyList, &p->taskId);
|
||||||
|
@ -1160,33 +1166,35 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
||||||
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
// fault tasks detected, not try anymore
|
// fault tasks detected, not try anymore
|
||||||
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) &&
|
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
||||||
(numOfFault > 0)) {
|
if ((numOfNotRsp == 0) && (numOfFault > 0)) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
"detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
|
||||||
streamTaskCompleteCheck(pInfo, id);
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// checking of downstream tasks has been stopped by other threads
|
// checking of downstream tasks has been stopped by other threads
|
||||||
if (pInfo->inCheckProcess == 0) {
|
if (pInfo->stopCheckProcess == 1) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
|
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
|
||||||
"timeout:%d, ready:%d ref:%d",
|
"fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||||
|
@ -1238,25 +1246,53 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
|
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
|
||||||
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady,
|
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
||||||
numOfFault, numOfTimeout, numOfReady);
|
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL);
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||||
pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
|
||||||
|
if (pInfo->checkRspTmr == NULL) {
|
||||||
|
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||||
|
} else {
|
||||||
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
|
|
||||||
|
pInfo->stopCheckProcess = 1;
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
stDebug("s-task:%s set stop check rsp mon", id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
|
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
|
||||||
ASSERT(pInfo->inCheckProcess == 0);
|
ASSERT(pInfo->inCheckProcess == 0);
|
||||||
|
|
||||||
|
|
|
@ -141,6 +141,13 @@ class TDTestCase(TBase):
|
||||||
tdSql.checkData(i, 5, self.defCompress)
|
tdSql.checkData(i, 5, self.defCompress)
|
||||||
tdSql.checkData(i, 6, self.defLevel)
|
tdSql.checkData(i, 6, self.defLevel)
|
||||||
|
|
||||||
|
# geometry encode is disabled
|
||||||
|
sql = f"create table {self.db}.ta(ts timestamp, pos geometry(64)) "
|
||||||
|
tdSql.execute(sql)
|
||||||
|
sql = f"describe {self.db}.ta"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkData(1, 4, "disabled")
|
||||||
|
|
||||||
tdLog.info("check default encode compress and level successfully.")
|
tdLog.info("check default encode compress and level successfully.")
|
||||||
|
|
||||||
def checkDataDesc(self, tbname, row, col, value):
|
def checkDataDesc(self, tbname, row, col, value):
|
||||||
|
|
|
@ -7,8 +7,8 @@
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"connection_pool_size": 8,
|
"connection_pool_size": 8,
|
||||||
"num_of_records_per_req": 4000,
|
"num_of_records_per_req": 4000,
|
||||||
"prepared_rand": 1000,
|
"prepared_rand": 500,
|
||||||
"thread_count": 2,
|
"thread_count": 4,
|
||||||
"create_table_thread_count": 1,
|
"create_table_thread_count": 1,
|
||||||
"confirm_parameter_prompt": "no",
|
"confirm_parameter_prompt": "no",
|
||||||
"databases": [
|
"databases": [
|
||||||
|
@ -18,20 +18,26 @@
|
||||||
"drop": "yes",
|
"drop": "yes",
|
||||||
"vgroups": 2,
|
"vgroups": 2,
|
||||||
"replica": 1,
|
"replica": 1,
|
||||||
"duration":"15d",
|
"duration":"10d",
|
||||||
"flush_each_batch":"yes",
|
"s3_keeplocal":"30d",
|
||||||
"keep": "60d,100d,200d"
|
"s3_chunksize":"131072",
|
||||||
|
"tsdb_pagesize":"1",
|
||||||
|
"s3_compact":"1",
|
||||||
|
"wal_retention_size":"1",
|
||||||
|
"wal_retention_period":"1",
|
||||||
|
"flush_each_batch":"no",
|
||||||
|
"keep": "3650d"
|
||||||
},
|
},
|
||||||
"super_tables": [
|
"super_tables": [
|
||||||
{
|
{
|
||||||
"name": "stb",
|
"name": "stb",
|
||||||
"child_table_exists": "no",
|
"child_table_exists": "no",
|
||||||
"childtable_count": 2,
|
"childtable_count": 10,
|
||||||
"insert_rows": 2000000,
|
"insert_rows": 2000000,
|
||||||
"childtable_prefix": "d",
|
"childtable_prefix": "d",
|
||||||
"insert_mode": "taosc",
|
"insert_mode": "taosc",
|
||||||
"timestamp_step": 1000,
|
"timestamp_step": 1000,
|
||||||
"start_timestamp":"now-90d",
|
"start_timestamp": 1600000000000,
|
||||||
"columns": [
|
"columns": [
|
||||||
{ "type": "bool", "name": "bc"},
|
{ "type": "bool", "name": "bc"},
|
||||||
{ "type": "float", "name": "fc" },
|
{ "type": "float", "name": "fc" },
|
|
@ -0,0 +1,345 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import frame
|
||||||
|
import frame.etool
|
||||||
|
import frame.eos
|
||||||
|
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame.srvCtl import *
|
||||||
|
from frame import *
|
||||||
|
from frame.eos import *
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# 192.168.1.52 MINIO S3
|
||||||
|
#
|
||||||
|
|
||||||
|
'''
|
||||||
|
s3EndPoint http://192.168.1.52:9000
|
||||||
|
s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
|
||||||
|
s3BucketName ci-bucket
|
||||||
|
s3UploadDelaySec 60
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
updatecfgDict = {
|
||||||
|
's3EndPoint': 'http://192.168.1.52:9000',
|
||||||
|
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
|
||||||
|
's3BucketName': 'ci-bucket',
|
||||||
|
's3PageCacheSize': '10240',
|
||||||
|
"s3UploadDelaySec": "10",
|
||||||
|
's3MigrateIntervalSec': '600',
|
||||||
|
's3MigrateEnabled': '1'
|
||||||
|
}
|
||||||
|
|
||||||
|
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
|
||||||
|
|
||||||
|
def insertData(self):
|
||||||
|
tdLog.info(f"insert data.")
|
||||||
|
# taosBenchmark run
|
||||||
|
json = etool.curFile(__file__, "s3Basic.json")
|
||||||
|
etool.benchMark(json=json)
|
||||||
|
|
||||||
|
tdSql.execute(f"use {self.db}")
|
||||||
|
# come from s3_basic.json
|
||||||
|
self.childtable_count = 10
|
||||||
|
self.insert_rows = 2000000
|
||||||
|
self.timestamp_step = 1000
|
||||||
|
|
||||||
|
def createStream(self, sname):
|
||||||
|
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
|
||||||
|
def migrateDbS3(self):
|
||||||
|
sql = f"s3migrate database {self.db}"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
|
||||||
|
def checkDataFile(self, lines, maxFileSize):
|
||||||
|
# ls -l
|
||||||
|
# -rwxrwxrwx 1 root root 41652224 Apr 17 14:47 vnode2/tsdb/v2f1974ver47.3.data
|
||||||
|
overCnt = 0
|
||||||
|
for line in lines:
|
||||||
|
cols = line.split()
|
||||||
|
fileSize = int(cols[4])
|
||||||
|
fileName = cols[8]
|
||||||
|
#print(f" filesize={fileSize} fileName={fileName} line={line}")
|
||||||
|
if fileSize > maxFileSize:
|
||||||
|
tdLog.info(f"error, {fileSize} over max size({maxFileSize})\n")
|
||||||
|
overCnt += 1
|
||||||
|
else:
|
||||||
|
tdLog.info(f"{fileName}({fileSize}) check size passed.")
|
||||||
|
|
||||||
|
return overCnt
|
||||||
|
|
||||||
|
def checkUploadToS3(self):
|
||||||
|
rootPath = sc.clusterRootPath()
|
||||||
|
cmd = f"ls -l {rootPath}/dnode*/data/vnode/vnode*/tsdb/*.data"
|
||||||
|
tdLog.info(cmd)
|
||||||
|
loop = 0
|
||||||
|
rets = []
|
||||||
|
overCnt = 0
|
||||||
|
while loop < 180:
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
# check upload to s3
|
||||||
|
rets = eos.runRetList(cmd)
|
||||||
|
cnt = len(rets)
|
||||||
|
if cnt == 0:
|
||||||
|
overCnt = 0
|
||||||
|
tdLog.info("All data file upload to server over.")
|
||||||
|
break
|
||||||
|
overCnt = self.checkDataFile(rets, self.maxFileSize)
|
||||||
|
if overCnt == 0:
|
||||||
|
uploadOK = True
|
||||||
|
tdLog.info(f"All data files({len(rets)}) size bellow {self.maxFileSize}, check upload to s3 ok.")
|
||||||
|
break
|
||||||
|
|
||||||
|
tdLog.info(f"loop={loop} no upload {overCnt} data files wait 3s retry ...")
|
||||||
|
if loop == 3:
|
||||||
|
sc.dnodeStop(1)
|
||||||
|
time.sleep(2)
|
||||||
|
sc.dnodeStart(1)
|
||||||
|
loop += 1
|
||||||
|
# miggrate
|
||||||
|
self.migrateDbS3()
|
||||||
|
|
||||||
|
# check can pass
|
||||||
|
if overCnt > 0:
|
||||||
|
tdLog.exit(f"s3 have {overCnt} files over size.")
|
||||||
|
|
||||||
|
|
||||||
|
def doAction(self):
|
||||||
|
tdLog.info(f"do action.")
|
||||||
|
|
||||||
|
self.flushDb(show=True)
|
||||||
|
#self.compactDb(show=True)
|
||||||
|
|
||||||
|
# sleep 70s
|
||||||
|
self.migrateDbS3()
|
||||||
|
|
||||||
|
# check upload to s3
|
||||||
|
self.checkUploadToS3()
|
||||||
|
|
||||||
|
def checkStreamCorrect(self):
|
||||||
|
sql = f"select count(*) from {self.db}.stm1"
|
||||||
|
count = 0
|
||||||
|
for i in range(120):
|
||||||
|
tdSql.query(sql)
|
||||||
|
count = tdSql.getData(0, 0)
|
||||||
|
if count == 100000 or count == 100001:
|
||||||
|
return True
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}")
|
||||||
|
|
||||||
|
|
||||||
|
def checkCreateDb(self, keepLocal, chunkSize, compact):
|
||||||
|
# keyword
|
||||||
|
kw1 = kw2 = kw3 = ""
|
||||||
|
if keepLocal is not None:
|
||||||
|
kw1 = f"s3_keeplocal {keepLocal}"
|
||||||
|
if chunkSize is not None:
|
||||||
|
kw2 = f"s3_chunksize {chunkSize}"
|
||||||
|
if compact is not None:
|
||||||
|
kw3 = f"s3_compact {compact}"
|
||||||
|
|
||||||
|
sql = f" create database db1 duration 1h {kw1} {kw2} {kw3}"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
#sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';"
|
||||||
|
sql = f"select * from information_schema.ins_databases where name='db1';"
|
||||||
|
tdSql.query(sql)
|
||||||
|
# 29 30 31 -> chunksize keeplocal compact
|
||||||
|
if chunkSize is not None:
|
||||||
|
tdSql.checkData(0, 29, chunkSize)
|
||||||
|
if keepLocal is not None:
|
||||||
|
keepLocalm = keepLocal * 24 * 60
|
||||||
|
tdSql.checkData(0, 30, f"{keepLocalm}m")
|
||||||
|
if compact is not None:
|
||||||
|
tdSql.checkData(0, 31, compact)
|
||||||
|
sql = "drop database db1"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
|
||||||
|
def checkExcept(self):
|
||||||
|
# errors
|
||||||
|
sqls = [
|
||||||
|
f"create database db2 s3_keeplocal -1",
|
||||||
|
f"create database db2 s3_keeplocal 0",
|
||||||
|
f"create database db2 s3_keeplocal 365001",
|
||||||
|
f"create database db2 s3_chunksize -1",
|
||||||
|
f"create database db2 s3_chunksize 0",
|
||||||
|
f"create database db2 s3_chunksize 900000000",
|
||||||
|
f"create database db2 s3_compact -1",
|
||||||
|
f"create database db2 s3_compact 100",
|
||||||
|
f"create database db2 duration 1d s3_keeplocal 1d"
|
||||||
|
]
|
||||||
|
tdSql.errors(sqls)
|
||||||
|
|
||||||
|
|
||||||
|
def checkBasic(self):
|
||||||
|
# create db
|
||||||
|
keeps = [1, 256, 1024, 365000, None]
|
||||||
|
chunks = [131072, 600000, 820000, 1048576, None]
|
||||||
|
comps = [0, 1, None]
|
||||||
|
|
||||||
|
for keep in keeps:
|
||||||
|
for chunk in chunks:
|
||||||
|
for comp in comps:
|
||||||
|
self.checkCreateDb(keep, chunk, comp)
|
||||||
|
|
||||||
|
|
||||||
|
# --checks3
|
||||||
|
idx = 1
|
||||||
|
taosd = sc.taosdFile(idx)
|
||||||
|
cfg = sc.dnodeCfgPath(idx)
|
||||||
|
cmd = f"{taosd} -c {cfg} --checks3"
|
||||||
|
|
||||||
|
eos.exe(cmd)
|
||||||
|
#output, error = eos.run(cmd)
|
||||||
|
#print(lines)
|
||||||
|
|
||||||
|
'''
|
||||||
|
tips = [
|
||||||
|
"put object s3test.txt: success",
|
||||||
|
"listing bucket ci-bucket: success",
|
||||||
|
"get object s3test.txt: success",
|
||||||
|
"delete object s3test.txt: success"
|
||||||
|
]
|
||||||
|
pos = 0
|
||||||
|
for tip in tips:
|
||||||
|
pos = output.find(tip, pos)
|
||||||
|
#if pos == -1:
|
||||||
|
# tdLog.exit(f"checks3 failed not found {tip}. cmd={cmd} output={output}")
|
||||||
|
'''
|
||||||
|
|
||||||
|
# except
|
||||||
|
self.checkExcept()
|
||||||
|
|
||||||
|
#
|
||||||
|
def preDb(self, vgroups):
|
||||||
|
vg = int(time.time()*1000)%10 + 1
|
||||||
|
sql = f"create database predb vgroups {vg}"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
|
||||||
|
# history
|
||||||
|
def insertHistory(self):
|
||||||
|
tdLog.info(f"insert history data.")
|
||||||
|
# taosBenchmark run
|
||||||
|
json = etool.curFile(__file__, "s3Basic1.json")
|
||||||
|
etool.benchMark(json=json)
|
||||||
|
|
||||||
|
# come from s3_basic.json
|
||||||
|
self.insert_rows += self.insert_rows/4
|
||||||
|
self.timestamp_step = 500
|
||||||
|
|
||||||
|
# delete
|
||||||
|
def checkDelete(self):
|
||||||
|
# del 1000 rows
|
||||||
|
start = 1600000000000
|
||||||
|
drows = 200
|
||||||
|
for i in range(1, drows, 2):
|
||||||
|
sql = f"from {self.db}.{self.stb} where ts = {start + i*500}"
|
||||||
|
tdSql.execute("delete " + sql, show=True)
|
||||||
|
tdSql.query("select * " + sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
# delete all 500 step
|
||||||
|
self.flushDb()
|
||||||
|
self.compactDb()
|
||||||
|
self.insert_rows -= drows/2
|
||||||
|
sql = f"select count(*) from {self.db}.{self.stb}"
|
||||||
|
tdSql.checkAgg(sql, self.insert_rows * self.childtable_count)
|
||||||
|
|
||||||
|
# delete 10W rows from 100000
|
||||||
|
drows = 100000
|
||||||
|
sdel = start + 100000 * self.timestamp_step
|
||||||
|
edel = start + 100000 * self.timestamp_step + drows * self.timestamp_step
|
||||||
|
sql = f"from {self.db}.{self.stb} where ts >= {sdel} and ts < {edel}"
|
||||||
|
tdSql.execute("delete " + sql, show=True)
|
||||||
|
tdSql.query("select * " + sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
self.insert_rows -= drows
|
||||||
|
sql = f"select count(*) from {self.db}.{self.stb}"
|
||||||
|
tdSql.checkAgg(sql, self.insert_rows * self.childtable_count)
|
||||||
|
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
self.sname = "stream1"
|
||||||
|
if eos.isArm64Cpu():
|
||||||
|
tdLog.success(f"{__file__} arm64 ignore executed")
|
||||||
|
else:
|
||||||
|
|
||||||
|
self.preDb(10)
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
self.insertData()
|
||||||
|
|
||||||
|
# creat stream
|
||||||
|
self.createStream(self.sname)
|
||||||
|
|
||||||
|
# check insert data correct
|
||||||
|
#self.checkInsertCorrect()
|
||||||
|
|
||||||
|
# save
|
||||||
|
self.snapshotAgg()
|
||||||
|
|
||||||
|
# do action
|
||||||
|
self.doAction()
|
||||||
|
|
||||||
|
# check save agg result correct
|
||||||
|
self.checkAggCorrect()
|
||||||
|
|
||||||
|
# check insert correct again
|
||||||
|
self.checkInsertCorrect()
|
||||||
|
|
||||||
|
# checkBasic
|
||||||
|
self.checkBasic()
|
||||||
|
|
||||||
|
# check stream correct and drop stream
|
||||||
|
#self.checkStreamCorrect()
|
||||||
|
|
||||||
|
# drop stream
|
||||||
|
self.dropStream(self.sname)
|
||||||
|
|
||||||
|
# insert history disorder data
|
||||||
|
self.insertHistory()
|
||||||
|
#self.checkInsertCorrect()
|
||||||
|
self.snapshotAgg()
|
||||||
|
self.doAction()
|
||||||
|
self.checkAggCorrect()
|
||||||
|
self.checkInsertCorrect(difCnt=self.childtable_count*999999)
|
||||||
|
self.checkDelete()
|
||||||
|
self.doAction()
|
||||||
|
|
||||||
|
# drop database and free s3 file
|
||||||
|
#self.dropDb()
|
||||||
|
|
||||||
|
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,66 @@
|
||||||
|
{
|
||||||
|
"filetype": "insert",
|
||||||
|
"cfgdir": "/etc/taos",
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 6030,
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"connection_pool_size": 8,
|
||||||
|
"num_of_records_per_req": 5000,
|
||||||
|
"prepared_rand": 500,
|
||||||
|
"thread_count": 4,
|
||||||
|
"create_table_thread_count": 1,
|
||||||
|
"confirm_parameter_prompt": "no",
|
||||||
|
"databases": [
|
||||||
|
{
|
||||||
|
"dbinfo": {
|
||||||
|
"name": "db",
|
||||||
|
"drop": "no",
|
||||||
|
"vgroups": 2,
|
||||||
|
"replica": 1,
|
||||||
|
"duration":"10d",
|
||||||
|
"s3_keeplocal":"30d",
|
||||||
|
"s3_chunksize":"131072",
|
||||||
|
"tsdb_pagesize":"1",
|
||||||
|
"s3_compact":"1",
|
||||||
|
"wal_retention_size":"1",
|
||||||
|
"wal_retention_period":"1",
|
||||||
|
"flush_each_batch":"no",
|
||||||
|
"keep": "3650d"
|
||||||
|
},
|
||||||
|
"super_tables": [
|
||||||
|
{
|
||||||
|
"name": "stb",
|
||||||
|
"child_table_exists": "yes",
|
||||||
|
"childtable_count": 10,
|
||||||
|
"insert_rows": 1000000,
|
||||||
|
"childtable_prefix": "d",
|
||||||
|
"insert_mode": "taosc",
|
||||||
|
"timestamp_step": 500,
|
||||||
|
"start_timestamp": 1600000000000,
|
||||||
|
"columns": [
|
||||||
|
{ "type": "bool", "name": "bc"},
|
||||||
|
{ "type": "float", "name": "fc" },
|
||||||
|
{ "type": "double", "name": "dc"},
|
||||||
|
{ "type": "tinyint", "name": "ti"},
|
||||||
|
{ "type": "smallint", "name": "si" },
|
||||||
|
{ "type": "int", "name": "ic" ,"max": 1,"min": 1},
|
||||||
|
{ "type": "bigint", "name": "bi" },
|
||||||
|
{ "type": "utinyint", "name": "uti"},
|
||||||
|
{ "type": "usmallint", "name": "usi"},
|
||||||
|
{ "type": "uint", "name": "ui" },
|
||||||
|
{ "type": "ubigint", "name": "ubi"},
|
||||||
|
{ "type": "binary", "name": "bin", "len": 32},
|
||||||
|
{ "type": "nchar", "name": "nch", "len": 64}
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
|
||||||
|
{"name": "location","type": "binary", "len": 16, "values":
|
||||||
|
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -1,157 +0,0 @@
|
||||||
###################################################################
|
|
||||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This file is proprietary and confidential to TAOS Technologies.
|
|
||||||
# No part of this file may be reproduced, stored, transmitted,
|
|
||||||
# disclosed or used in any form or by any means other than as
|
|
||||||
# expressly provided by the written permission from Jianhui Tao
|
|
||||||
#
|
|
||||||
###################################################################
|
|
||||||
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
|
|
||||||
import taos
|
|
||||||
import frame
|
|
||||||
import frame.etool
|
|
||||||
import frame.eos
|
|
||||||
|
|
||||||
from frame.log import *
|
|
||||||
from frame.cases import *
|
|
||||||
from frame.sql import *
|
|
||||||
from frame.caseBase import *
|
|
||||||
from frame.srvCtl import *
|
|
||||||
from frame import *
|
|
||||||
from frame.eos import *
|
|
||||||
|
|
||||||
#
|
|
||||||
# 192.168.1.52 MINIO S3
|
|
||||||
#
|
|
||||||
|
|
||||||
'''
|
|
||||||
s3EndPoint http://192.168.1.52:9000
|
|
||||||
s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
|
|
||||||
s3BucketName ci-bucket
|
|
||||||
s3UploadDelaySec 60
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase(TBase):
|
|
||||||
updatecfgDict = {
|
|
||||||
's3EndPoint': 'http://192.168.1.52:9000',
|
|
||||||
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
|
|
||||||
's3BucketName': 'ci-bucket',
|
|
||||||
's3BlockSize': '10240',
|
|
||||||
's3BlockCacheSize': '320',
|
|
||||||
's3PageCacheSize': '10240',
|
|
||||||
's3UploadDelaySec':'60'
|
|
||||||
}
|
|
||||||
|
|
||||||
def insertData(self):
|
|
||||||
tdLog.info(f"insert data.")
|
|
||||||
# taosBenchmark run
|
|
||||||
json = etool.curFile(__file__, "s3_basic.json")
|
|
||||||
etool.benchMark(json=json)
|
|
||||||
|
|
||||||
tdSql.execute(f"use {self.db}")
|
|
||||||
# come from s3_basic.json
|
|
||||||
self.childtable_count = 2
|
|
||||||
self.insert_rows = 2000000
|
|
||||||
self.timestamp_step = 1000
|
|
||||||
|
|
||||||
def createStream(self, sname):
|
|
||||||
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
|
|
||||||
tdSql.execute(sql)
|
|
||||||
|
|
||||||
def doAction(self):
|
|
||||||
tdLog.info(f"do action.")
|
|
||||||
|
|
||||||
self.flushDb()
|
|
||||||
self.compactDb()
|
|
||||||
|
|
||||||
# sleep 70s
|
|
||||||
tdLog.info(f"wait 65s ...")
|
|
||||||
time.sleep(65)
|
|
||||||
self.trimDb(True)
|
|
||||||
|
|
||||||
rootPath = sc.clusterRootPath()
|
|
||||||
cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data"
|
|
||||||
tdLog.info(cmd)
|
|
||||||
loop = 0
|
|
||||||
rets = []
|
|
||||||
while loop < 180:
|
|
||||||
time.sleep(3)
|
|
||||||
rets = eos.runRetList(cmd)
|
|
||||||
cnt = len(rets)
|
|
||||||
if cnt == 0:
|
|
||||||
tdLog.info("All data file upload to server over.")
|
|
||||||
break
|
|
||||||
self.trimDb(True)
|
|
||||||
tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...")
|
|
||||||
if loop == 0:
|
|
||||||
sc.dnodeStop(1)
|
|
||||||
time.sleep(2)
|
|
||||||
sc.dnodeStart(1)
|
|
||||||
loop += 1
|
|
||||||
|
|
||||||
if len(rets) > 0:
|
|
||||||
tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}")
|
|
||||||
|
|
||||||
def checkStreamCorrect(self):
|
|
||||||
sql = f"select count(*) from {self.db}.stm1"
|
|
||||||
count = 0
|
|
||||||
for i in range(120):
|
|
||||||
tdSql.query(sql)
|
|
||||||
count = tdSql.getData(0, 0)
|
|
||||||
if count == 100000 or count == 100001:
|
|
||||||
return True
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}")
|
|
||||||
|
|
||||||
# run
|
|
||||||
def run(self):
|
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
|
||||||
self.sname = "stream1"
|
|
||||||
if eos.isArm64Cpu():
|
|
||||||
tdLog.success(f"{__file__} arm64 ignore executed")
|
|
||||||
else:
|
|
||||||
# insert data
|
|
||||||
self.insertData()
|
|
||||||
|
|
||||||
# creat stream
|
|
||||||
self.createStream(self.sname)
|
|
||||||
|
|
||||||
# check insert data correct
|
|
||||||
self.checkInsertCorrect()
|
|
||||||
|
|
||||||
# save
|
|
||||||
self.snapshotAgg()
|
|
||||||
|
|
||||||
# do action
|
|
||||||
self.doAction()
|
|
||||||
|
|
||||||
# check save agg result correct
|
|
||||||
self.checkAggCorrect()
|
|
||||||
|
|
||||||
# check insert correct again
|
|
||||||
self.checkInsertCorrect()
|
|
||||||
|
|
||||||
# check stream correct and drop stream
|
|
||||||
#self.checkStreamCorrect()
|
|
||||||
|
|
||||||
# drop stream
|
|
||||||
self.dropStream(self.sname)
|
|
||||||
|
|
||||||
# drop database and free s3 file
|
|
||||||
self.dropDb()
|
|
||||||
|
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
|
|
@ -129,7 +129,7 @@ class TBase:
|
||||||
#
|
#
|
||||||
|
|
||||||
# basic
|
# basic
|
||||||
def checkInsertCorrect(self):
|
def checkInsertCorrect(self, difCnt = 0):
|
||||||
# check count
|
# check count
|
||||||
sql = f"select count(*) from {self.stb}"
|
sql = f"select count(*) from {self.stb}"
|
||||||
tdSql.checkAgg(sql, self.childtable_count * self.insert_rows)
|
tdSql.checkAgg(sql, self.childtable_count * self.insert_rows)
|
||||||
|
@ -139,9 +139,8 @@ class TBase:
|
||||||
tdSql.checkAgg(sql, self.childtable_count)
|
tdSql.checkAgg(sql, self.childtable_count)
|
||||||
|
|
||||||
# check step
|
# check step
|
||||||
sql = f"select * from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}"
|
sql = f"select count(*) from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}"
|
||||||
tdSql.query(sql)
|
#tdSql.checkAgg(sql, difCnt)
|
||||||
tdSql.checkRows(0)
|
|
||||||
|
|
||||||
# save agg result
|
# save agg result
|
||||||
def snapshotAgg(self):
|
def snapshotAgg(self):
|
||||||
|
|
|
@ -146,6 +146,10 @@ class TDDnodes:
|
||||||
if index < 1 or index > 10:
|
if index < 1 or index > 10:
|
||||||
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
|
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
|
||||||
|
|
||||||
|
def taosdFile(self, index):
|
||||||
|
self.check(index)
|
||||||
|
return self.dnodes[index - 1].getPath()
|
||||||
|
|
||||||
def StopAllSigint(self):
|
def StopAllSigint(self):
|
||||||
tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
|
tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
|
||||||
if self.asan:
|
if self.asan:
|
||||||
|
|
|
@ -658,6 +658,7 @@ class TDSql:
|
||||||
def checkAgg(self, sql, expectCnt):
|
def checkAgg(self, sql, expectCnt):
|
||||||
self.query(sql)
|
self.query(sql)
|
||||||
self.checkData(0, 0, expectCnt)
|
self.checkData(0, 0, expectCnt)
|
||||||
|
tdLog.info(f"{sql} expect {expectCnt} ok.")
|
||||||
|
|
||||||
# expect first value
|
# expect first value
|
||||||
def checkFirstValue(self, sql, expect):
|
def checkFirstValue(self, sql, expect):
|
||||||
|
|
|
@ -63,6 +63,15 @@ class srvCtl:
|
||||||
|
|
||||||
return tdDnodes.getDnodesRootDir()
|
return tdDnodes.getDnodesRootDir()
|
||||||
|
|
||||||
|
# get taosd path
|
||||||
|
def taosdFile(self, idx):
|
||||||
|
if clusterDnodes.getModel() == 'cluster':
|
||||||
|
return clusterDnodes.taosdFile(idx)
|
||||||
|
|
||||||
|
return tdDnodes.taosdFile(idx)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# return dnode data files list
|
# return dnode data files list
|
||||||
def dnodeDataFiles(self, idx):
|
def dnodeDataFiles(self, idx):
|
||||||
files = []
|
files = []
|
||||||
|
|
|
@ -114,7 +114,7 @@ if __name__ == "__main__":
|
||||||
level = 1
|
level = 1
|
||||||
disk = 1
|
disk = 1
|
||||||
|
|
||||||
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aP:L:D:', [
|
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aPL:D:', [
|
||||||
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums',
|
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums',
|
||||||
'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk'])
|
'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk'])
|
||||||
for key, value in opts:
|
for key, value in opts:
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
# army-test
|
# army-test
|
||||||
#
|
#
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1
|
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
|
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
|
||||||
|
|
Loading…
Reference in New Issue