어떻게 python에서 병렬처리하기 위해 concurrent.future를 작성하나요? Tableau Server REST API

1개월 전 질문 1개월 전 토론 9 views

병렬처리는 처음인데요.

Tableau에서 workbook 3개를 리프레쉬 하려고 합니다.

workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}

 

@retry(tries=3, delay=5, backoff=0.2)

def refresh_workbooks(self, workbook_dict):

    for workbook_name, workbook_id in workbook_dict.items():

        workbook = self.server.workbooks.get_by_id(workbook_id)

 

        #refresh will fire up the refresh, and return job_id object

        job_id = self.server.workbooks.refresh(workbook)

 

        #wait_for job will check the status and raise exception if fails or timeout

        #https://tableau.github.io/server-client-python/docs/api-ref#jobswait_for_job

 

        self.server.jobs.wait_for_job(job_id,timeout=1000)

 이 기본 코드는 잘 작동하며 각 워크북을 완료하는 데 15분이 걸리므로 총 45분이 소요됩니다.

두 번째 워크북에서 실패하면 처음부터 다시 시작됩니다.

 

병렬처리를 사용하여 속도를 높이고 wait_for_job을 확인하고 싶은데요.

실패한 경우 해당 워크북만 새로 고친 후 오류를 발생시키기 전에 몇 번 다시 시도했으면 합니다.

 

첫 번째 질문: 아래 코드는 병렬처리를 시도하지만 출력된 데이터 개체는 None을 리턴합니다.

코드가 wait_for job을 실행하지 못한 것 같은데 왜 그런가요?

import concurrent.futures
import urllib.request
import tableauserverclient as TSC

def refresh_workbook(self, workbook_id):
    workbook = self.server.workbooks.get_by_id(workbook_id)
    job_id = self.server.workbooks.refresh(workbook)
    return job_id

def refresh_workbooks(self, workbook_dict):
    job_dict = {}
    try:
        for workbook_name, workbook_id in workbook_dict.items():
            workbook = self.server.workbooks.get_by_id(workbook_id)
            job_id = self.server.workbooks.refresh(workbook)
            job_dict[workbook_name] = job_id.id
            print(job_dict)
    except:
        raise

    return job_dict


def wait_workbook(self, job_id, timeout=None):
    self.server.jobs.wait_for_job(job_id, timeout=timeout)


test = TableauServerConnection()
workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}
jobs = test.refresh_workbooks(workbook_dict)


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    
    future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
    for future in concurrent.futures.as_completed(future_to_job):
        job = future_to_job[future]
        try:
            data = future.result()
            #Why did I get None in data? 
            print('data',data)
        except Exception as exc:
            #can I spin up new future here and what is the correct syntax?

            print('%s generated an exception: %s' % (job, exc))
        else:
            print('job', job)

 

 두번째 질문: exception에서 재시도를 추가할 경우 새로운 future object를 추가할 수 있나요?

python 병렬처리 concurrent.future tableau

2022-05-21 12:09

1개의 해답

이런 걸 시도해 보셔야 할 것 같은데요,

while 루프를 사용해서 마지막 결과를 얻었습니다.

그러면 완료된 각 작업에 대해 몇 가지 새 작업을 추가할 수 있습니다.

얘기주신 예에서는 작업이 실패하는 경우에만 해당됩니다.

 

그러나 이렇게 하면 처음부터 모든 작업을 시작할 때 submit하지 않아도 됩니다.

실행할 작업 수에 따라 처음에 X개의 작업를 추가한 다음 태스크를 완료할 때마다 하나씩 추가하는 것이 좋습니다.

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
while future_to_job:
    done, _ = concurrent.futures.wait(
            future_to_job, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

    for future in done:

        try:
            data = future.result()
            #Why did I get None in data? 
            print('data',data)
        except Exception as exc:
            #can I spin up new future here and what is the correct syntax?
            print('%s generated an exception: %s' % (job, exc))
            # get the jobs id 
            job_id = future_to_job[future]
            # clean up the old task
            del future_to_job[future]
            # add the wen one
            future_to_job[executor.submit(test.wait_workbook, job_id, 1800)] = job_id
        else:
            print('job', job)

None 부분의 경우 wait_workbook 함수가 결과를 반환해야 합니다.

def wait_workbook(self, job_id, timeout=None):
    return self.server.jobs.wait_for_job(job_id, timeout=timeout


2022-05-21 12:14

해결방법이나 팁을 알고 계신다면


© 2022 pinfo. All rights reserved.