diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 74b7813465..58ec3de32f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -89,7 +89,7 @@ int32_t scheduleFetchRows(void *pJob, void **data); * @param pJob * @return */ -int32_t scheduleCancelJob(void *pJob); +//int32_t scheduleCancelJob(void *pJob); /** * Free the query job diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 87a3000d09..436593f9d6 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -1322,3 +1322,4 @@ int main(int argc, char** argv) { + \ No newline at end of file diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d478e80af0..4a0d62a06f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -87,68 +87,72 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t code = 0; - int8_t oriStatus = SCH_GET_JOB_STATUS(pJob); + int8_t oriStatus = 0; -/* - if (oriStatus == newStatus) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - switch (oriStatus) { - case JOB_TASK_STATUS_NULL: - if (newStatus != JOB_TASK_STATUS_EXECUTING - && newStatus != JOB_TASK_STATUS_FAILED - && newStatus != JOB_TASK_STATUS_NOT_START) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - break; - case JOB_TASK_STATUS_NOT_START: - if (newStatus != JOB_TASK_STATUS_CANCELLED) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - break; - case JOB_TASK_STATUS_EXECUTING: - if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED - && newStatus != JOB_TASK_STATUS_FAILED - && newStatus != JOB_TASK_STATUS_CANCELLING - && newStatus != JOB_TASK_STATUS_CANCELLED - && newStatus != JOB_TASK_STATUS_DROPPING) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - break; - case JOB_TASK_STATUS_PARTIAL_SUCCEED: - if (newStatus != JOB_TASK_STATUS_EXECUTING - && newStatus != JOB_TASK_STATUS_SUCCEED - && newStatus != JOB_TASK_STATUS_CANCELLED) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - break; - case JOB_TASK_STATUS_SUCCEED: - case JOB_TASK_STATUS_FAILED: - case JOB_TASK_STATUS_CANCELLING: - if (newStatus != JOB_TASK_STATUS_CANCELLED) { - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - break; - case JOB_TASK_STATUS_CANCELLED: - case JOB_TASK_STATUS_DROPPING: + while (true) { + oriStatus = SCH_GET_JOB_STATUS(pJob); + + if (oriStatus == newStatus) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - break; - - default: - qError("invalid task status:%d", oriStatus); - return TSDB_CODE_QRY_APP_ERROR; + } + + switch (oriStatus) { + case JOB_TASK_STATUS_NULL: + if (newStatus != JOB_TASK_STATUS_NOT_START) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_NOT_START: + if (newStatus != JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_EXECUTING: + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_CANCELLING + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_PARTIAL_SUCCEED: + if (newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_SUCCEED + && newStatus != JOB_TASK_STATUS_DROPPING) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_SUCCEED: + case JOB_TASK_STATUS_FAILED: + case JOB_TASK_STATUS_CANCELLING: + if (newStatus != JOB_TASK_STATUS_DROPPING) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_CANCELLED: + case JOB_TASK_STATUS_DROPPING: + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + break; + + default: + SCH_JOB_ELOG("invalid job status:%d", oriStatus); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) { + continue; + } + + SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus); + + break; } -*/ - - SCH_SET_JOB_STATUS(pJob, newStatus); - - SCH_JOB_DLOG("status updated from %d to %d", oriStatus, newStatus); return TSDB_CODE_SUCCESS; @@ -507,6 +511,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b } +// Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; @@ -515,7 +520,13 @@ int32_t schFetchFromRemote(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - if (atomic_load_ptr(&pJob->res)) + void *res = atomic_load_ptr(&pJob->res); + if (res) { + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_JOB_DLOG("res already fetched, res:%p", res); + return TSDB_CODE_SUCCESS; + } SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); @@ -525,12 +536,15 @@ _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + schProcessOnJobFailure(pJob, code); + return code; } // Note: no more error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + // if already FAILED, no more processing SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); if (errCode) { @@ -813,7 +827,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { - qError("QID:%"PRIx64" taosHashGet queryId not exist", pParam->queryId); + qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -1147,7 +1161,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); if (size <= 0) { - SCH_TASK_DLOG("empty exec address, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask)); return; } @@ -1157,6 +1171,8 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK); } + + SCH_TASK_DLOG("task has %d exec address", size); } void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { @@ -1331,6 +1347,12 @@ int32_t scheduleFetchRows(void *job, void **data) { SSchJob *pJob = job; int32_t code = 0; + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%d", status); + return TSDB_CODE_SCH_STATUS_ERROR; + } + atomic_add_fetch_32(&pJob->ref, 1); if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { @@ -1345,8 +1367,6 @@ int32_t scheduleFetchRows(void *job, void **data) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_FAILED) { *data = atomic_load_ptr(&pJob->res); atomic_store_ptr(&pJob->res, NULL); @@ -1414,6 +1434,10 @@ void scheduleFreeJob(void *job) { return; } + schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); + + SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); + while (true) { int32_t ref = atomic_load_32(&pJob->ref); if (0 == ref) { @@ -1425,6 +1449,8 @@ void scheduleFreeJob(void *job) { } } + SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { schCancelJob(pJob); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 70e49e6b45..f75f0ef263 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -38,52 +38,73 @@ namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +void schtInitLogFile() { + const char *defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + qDebugFlag = 159; + + char temp[128] = {0}; + sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); + if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } + +} + + void schtBuildQueryDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000001; dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SArray *scan = taosArrayInit(1, sizeof(SSubplan)); - SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + SArray *scan = taosArrayInit(1, POINTER_BYTES); + SArray *merge = taosArrayInit(1, POINTER_BYTES); - SSubplan scanPlan = {0}; - SSubplan mergePlan = {0}; + SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan)); + SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); - scanPlan.id.queryId = qId; - scanPlan.id.templateId = 0x0000000000000002; - scanPlan.id.subplanId = 0x0000000000000003; - scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.execNode.numOfEps = 1; - scanPlan.execNode.nodeId = 1; - scanPlan.execNode.inUse = 0; - scanPlan.execNode.epAddr[0].port = 6030; - strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0"); - scanPlan.pChildren = NULL; - scanPlan.level = 1; - scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); - scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + scanPlan->id.queryId = qId; + scanPlan->id.templateId = 0x0000000000000002; + scanPlan->id.subplanId = 0x0000000000000003; + scanPlan->type = QUERY_TYPE_SCAN; + scanPlan->execNode.numOfEps = 1; + scanPlan->execNode.nodeId = 1; + scanPlan->execNode.inUse = 0; + scanPlan->execNode.epAddr[0].port = 6030; + strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0"); + scanPlan->pChildren = NULL; + scanPlan->level = 1; + scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); + scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); - mergePlan.id.queryId = qId; - mergePlan.id.templateId = 0x4444444444; - mergePlan.id.subplanId = 0x5555555555; - mergePlan.type = QUERY_TYPE_MERGE; - mergePlan.level = 0; - mergePlan.execNode.numOfEps = 0; - mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES); - mergePlan.pParents = NULL; - mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + mergePlan->id.queryId = qId; + mergePlan->id.templateId = 0x4444444444; + mergePlan->id.subplanId = 0x5555555555; + mergePlan->type = QUERY_TYPE_MERGE; + mergePlan->level = 0; + mergePlan->execNode.numOfEps = 0; + mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); + mergePlan->pParents = NULL; + mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); - taosArrayPush(mergePointer->pChildren, &scanPointer); - taosArrayPush(scanPointer->pParents, &mergePointer); + taosArrayPush(mergePlan->pChildren, &scanPlan); + taosArrayPush(scanPlan->pParents, &mergePlan); taosArrayPush(dag->pSubplans, &merge); taosArrayPush(dag->pSubplans, &scan); } +void schtFreeQueryDag(SQueryDag *dag) { + +} + + void schtBuildInsertDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000002; @@ -138,8 +159,8 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { return 0; } -int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { - return 0; +void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { + } @@ -209,6 +230,9 @@ TEST(queryTest, normalCase) { SVgroupInfo vgInfo = {0}; void *pJob = NULL; SQueryDag dag = {0}; + + schtInitLogFile(); + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); SEpAddr qnodeAddr = {0}; @@ -295,6 +319,8 @@ TEST(queryTest, normalCase) { ASSERT_EQ(data, (void*)NULL); scheduleFreeJob(pJob); + + schtFreeQueryDag(&dag); } @@ -308,6 +334,9 @@ TEST(insertTest, normalCase) { SVgroupInfo vgInfo = {0}; SQueryDag dag = {0}; uint64_t numOfRows = 0; + + schtInitLogFile(); + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); SEpAddr qnodeAddr = {0}; @@ -336,7 +365,11 @@ TEST(insertTest, normalCase) { scheduleFreeJob(pInsertJob); } +TEST(multiThread, forceFree) { + schtInitLogFile(); + +} int main(int argc, char** argv) { diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index e235b0714e..0be17ca2b9 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -1087,7 +1087,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { return 0; } - return -1; + return 0; } char *taosGetCmdlineByPID(int pid) {