병렬처리는 처음인데요.
Tableau에서 workbook 3개를 리프레쉬 하려고 합니다.
workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}
@retry(tries=3, delay=5, backoff=0.2)
def refresh_workbooks(self, workbook_dict):
for workbook_name, workbook_id in workbook_dict.items():
workbook = self.server.workbooks.get_by_id(workbook_id)
#refresh will fire up the refresh, and return job_id object
job_id = self.server.workbooks.refresh(workbook)
#wait_for job will check the status and raise exception if fails or timeout
#https://tableau.github.io/server-client-python/docs/api-ref#jobswait_for_job
self.server.jobs.wait_for_job(job_id,timeout=1000)
이 기본 코드는 잘 작동하며 각 워크북을 완료하는 데 15분이 걸리므로 총 45분이 소요됩니다.
두 번째 워크북에서 실패하면 처음부터 다시 시작됩니다.
병렬처리를 사용하여 속도를 높이고 wait_for_job을 확인하고 싶은데요.
실패한 경우 해당 워크북만 새로 고친 후 오류를 발생시키기 전에 몇 번 다시 시도했으면 합니다.
첫 번째 질문: 아래 코드는 병렬처리를 시도하지만 출력된 데이터 개체는 None을 리턴합니다.
코드가 wait_for job을 실행하지 못한 것 같은데 왜 그런가요?
import concurrent.futures
import urllib.request
import tableauserverclient as TSC
def refresh_workbook(self, workbook_id):
workbook = self.server.workbooks.get_by_id(workbook_id)
job_id = self.server.workbooks.refresh(workbook)
return job_id
def refresh_workbooks(self, workbook_dict):
job_dict = {}
try:
for workbook_name, workbook_id in workbook_dict.items():
workbook = self.server.workbooks.get_by_id(workbook_id)
job_id = self.server.workbooks.refresh(workbook)
job_dict[workbook_name] = job_id.id
print(job_dict)
except:
raise
return job_dict
def wait_workbook(self, job_id, timeout=None):
self.server.jobs.wait_for_job(job_id, timeout=timeout)
test = TableauServerConnection()
workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}
jobs = test.refresh_workbooks(workbook_dict)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
for future in concurrent.futures.as_completed(future_to_job):
job = future_to_job[future]
try:
data = future.result()
#Why did I get None in data?
print('data',data)
except Exception as exc:
#can I spin up new future here and what is the correct syntax?
print('%s generated an exception: %s' % (job, exc))
else:
print('job', job)
두번째 질문: exception에서 재시도를 추가할 경우 새로운 future object를 추가할 수 있나요?
python 병렬처리 concurrent.future tableau
이런 걸 시도해 보셔야 할 것 같은데요,
while 루프를 사용해서 마지막 결과를 얻었습니다.
그러면 완료된 각 작업에 대해 몇 가지 새 작업을 추가할 수 있습니다.
얘기주신 예에서는 작업이 실패하는 경우에만 해당됩니다.
그러나 이렇게 하면 처음부터 모든 작업을 시작할 때 submit하지 않아도 됩니다.
실행할 작업 수에 따라 처음에 X개의 작업를 추가한 다음 태스크를 완료할 때마다 하나씩 추가하는 것이 좋습니다.
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
try:
data = future.result()
#Why did I get None in data?
print('data',data)
except Exception as exc:
#can I spin up new future here and what is the correct syntax?
print('%s generated an exception: %s' % (job, exc))
# get the jobs id
job_id = future_to_job[future]
# clean up the old task
del future_to_job[future]
# add the wen one
future_to_job[executor.submit(test.wait_workbook, job_id, 1800)] = job_id
else:
print('job', job)
None 부분의 경우 wait_workbook 함수가 결과를 반환해야 합니다.
def wait_workbook(self, job_id, timeout=None):
return self.server.jobs.wait_for_job(job_id, timeout=timeout
© 2022 pinfo. All rights reserved.