commit
5747629c9e
|
@ -219,6 +219,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||||
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
||||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||||
|
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
||||||
|
|
||||||
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
||||||
SMqRspObj* msg = (SMqRspObj*)res;
|
SMqRspObj* msg = (SMqRspObj*)res;
|
||||||
|
|
|
@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
if (pTscObj->pAppInfo) {
|
if (pTscObj->pAppInfo) {
|
||||||
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
|
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t total = atomic_add_fetch_64((int64_t*)&pSummary->totalRequests, 1);
|
int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
|
||||||
int32_t currentInst = atomic_add_fetch_64((int64_t*)&pSummary->currentRequests, 1);
|
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
||||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
||||||
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||||
|
@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
static void deregisterRequest(SRequestObj *pRequest) {
|
static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
STscObj * pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t currentInst = atomic_sub_fetch_64((int64_t*)&pActivity->currentRequests, 1);
|
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
||||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||||
|
|
||||||
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
||||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
||||||
" ms, current:%d, app current:%d",
|
" ms, current:%d, app current:%d",
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst);
|
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
|
||||||
releaseTscObj(pTscObj->id);
|
releaseTscObj(pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
|
||||||
return pObj;
|
return pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj *acquireTscObj(int64_t rid) {
|
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
|
||||||
return (STscObj *)taosAcquireRef(clientConnRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t releaseTscObj(int64_t rid) {
|
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
|
||||||
return taosReleaseRef(clientConnRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
|
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
|
||||||
assert(pObj != NULL);
|
assert(pObj != NULL);
|
||||||
|
@ -194,7 +190,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
||||||
taosMemoryFreeClear(pResInfo->pRspMsg);
|
taosMemoryFreeClear(pResInfo->pRspMsg);
|
||||||
taosMemoryFreeClear(pResInfo->length);
|
taosMemoryFreeClear(pResInfo->length);
|
||||||
taosMemoryFreeClear(pResInfo->row);
|
taosMemoryFreeClear(pResInfo->row);
|
||||||
|
@ -244,14 +240,9 @@ void destroyRequest(SRequestObj *pRequest) {
|
||||||
taosRemoveRef(clientReqRefPool, pRequest->self);
|
taosRemoveRef(clientReqRefPool, pRequest->self);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *acquireRequest(int64_t rid) {
|
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
|
||||||
return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t releaseRequest(int64_t rid) {
|
|
||||||
return taosReleaseRef(clientReqRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
|
||||||
|
|
||||||
void taos_init_imp(void) {
|
void taos_init_imp(void) {
|
||||||
// In the APIs of other program language, taos_cleanup is not available yet.
|
// In the APIs of other program language, taos_cleanup is not available yet.
|
||||||
|
|
|
@ -135,6 +135,16 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
|
} else if (TD_RES_TMQ(res)) {
|
||||||
|
SMqRspObj *pRsp = (SMqRspObj *)res;
|
||||||
|
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||||
|
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||||
|
if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema);
|
||||||
|
if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName);
|
||||||
|
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
|
||||||
|
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
|
||||||
|
pRsp->resInfo.pRspMsg = NULL;
|
||||||
|
doFreeReqResultInfo(&pRsp->resInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
||||||
#if FILE_WITH_LOCK
|
#if FILE_WITH_LOCK
|
||||||
taosThreadRwlockWrlock(&(pFile->rwlock));
|
taosThreadRwlockWrlock(&(pFile->rwlock));
|
||||||
#endif
|
#endif
|
||||||
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
|
assert(pFile->fd >= 0); // Please check if you have closed the file.
|
||||||
|
|
||||||
int64_t nleft = count;
|
int64_t nleft = count;
|
||||||
int64_t nwritten = 0;
|
int64_t nwritten = 0;
|
||||||
|
|
|
@ -222,7 +222,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
||||||
tsLogObj.logHandle->pFile = pFile;
|
tsLogObj.logHandle->pFile = pFile;
|
||||||
tsLogObj.lines = 0;
|
tsLogObj.lines = 0;
|
||||||
tsLogObj.openInProgress = 0;
|
tsLogObj.openInProgress = 0;
|
||||||
taosSsleep(3);
|
taosSsleep(10);
|
||||||
taosCloseLogByFd(pOldFile);
|
taosCloseLogByFd(pOldFile);
|
||||||
|
|
||||||
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
||||||
|
|
Loading…
Reference in New Issue