Workfile¶
-
class
alpine.workfile.
Workfile
(base_url, session, token)¶ A class for interacting with workfiles. The top-level methods deal with workfile management. The subclass Process can be used to interact with individual workfiles, including running workflows with workflow variables.
-
class
Process
(base_url, session, token)¶ A class for interacting with workfiles.
-
download_results
(workflow_id, process_id)¶ Downloads a workflow run result.
Parameters: - workflow_id (int) – ID of the workflow.
- process_id (str) – ID of a particular workflow run.
Returns: JSON object of workflow results.
Return type: dict
Raises: - WorkspaceNotFoundException – The workspace does not exist.
- WorkfileNotFoundException – The workflow does not exist.
- ResultsNotFoundException – Results not found or does not match expected structure.
Example:
>>> downloaded_flow_results = session.workfile.process.download_results(workflow_id = 375, >>> process_id = process_id)
-
static
find_operator
(operator_name, flow_results)¶ Helper method to parse a downloaded workflow result to extract data for a single operator.
Parameters: - operator_name (str) – Operator name to extract. Must be an exact match to the name in the workflow.
- flow_results (dict) – JSON object of Alpine flow results from download_results.
Returns: Single operator data.
Return type: dict
Raises: FlowResultsMalformedException – Workflow result does not contain the key [‘outputs’].
Example:
>>> operator_data = session.workfile.process.find_operator(operator_name = 'Row Filter', >>> flow_results = downloaded_flow_results)
-
static
get_metadata
(flow_results)¶ Returns the metadata for a particular workflow run including time, number of operators, user, and number of runtime errors.
Parameters: flow_results (dict) – JSON object of Alpine flow results from download_results. Returns: Run metadata. Return type: dict Raises: FlowResultsMalformedException – Workflow result does not contain the key [‘flowMetaInfo’]. Example:
>>> session.workfile.process.get_metadata(flow_results = downloaded_flow_results)
-
query_status
(process_id)¶ Returns the status of a running workflow.
Parameters: process_id (str) – ID of a particular workflow run. Returns: State of workflow run. One of ‘WORKING’, ‘FINISHED’, or ‘FAILED’. Return type: str Raises: RunFlowFailureException – Process ID not found. Example:
>>> session.workfile.process.query_status(process_id = process_id)
-
run
(workflow_id, variables=None)¶ Run a workflow, optionally including a list of workflow variables. Returns a process_id which is needed by other functions which query a run or download results.
Parameters: - workflow_id (str) – ID of the workflow.
- variables (list) – A list of workflow variables with the format: [ {“name”: “wfv_name_1”, “value”: “wfv_value_1”}, {“name”: “wfv_name_2”, “value”: “wfv_value_2”} ]
Returns: ID for the workflow run process.
Return type: str
Raises: - WorkspaceNotFoundException – The workspace does not exist.
- WorkfileNotFoundException – The workfile does not exist.
Example:
>>> work_flow_variables = [{"name": "@row_filter", "value": "13"}] >>> process_id = session.workfile.process.run(workflow_id = 375, variables = work_flow_variables)
-
stop
(process_id)¶ Attempts to stop a running workflow.
Parameters: process_id (str) – Process ID of the workflow. Returns: Flow status. One of ‘STOPPED’ or ‘STOP FAILED’. Return type: str Raises: StopFlowFailureException – Workflow run not found. Example:
>>> session.workfile.process.stop(process_id = process_id)
-
wait_until_finished
(workflow_id, process_id, verbose=False, query_time=10, timeout=3600)¶ Waits for a running workflow to finish.
Parameters: - workflow_id (int) – ID of a particular workflow run.
- process_id (str) – ID of a particular workflow run.
- verbose (bool) – Optionally print approximate run time.
- query_time (float) – Number of seconds between status queries. Minimum of 1 second.
- timeout (float) – Amount of time in seconds to wait for workflow to finish. Will stop if exceeded.
Returns: Workflow run status.
Return type: str
Raises: - RunFlowTimeoutException – Workflow runtime has exceeded timeout.
- RunFlowFailureException – Status of FAILURE is detected.
Example:
>>> session.workfile.process.wait_until_finished(workflow_id = workflow_id, process_id = process_id)
-
-
delete
(workfile_id)¶ Deletes a workfile from a workspace.
Parameters: workfile_id (int) – ID of workfile to delete.
Returns: None.
Return type: NoneType
Raises: - WorkspaceNotFoundException – The workspace does not exist.
- WorkfileNotFoundException – The workfile does not exist.
Example:
>>> session.workfile.delete(workflow_id = 375)
-
download
(workfile_id)¶ Download an Alpine workfile. Will not download Alpine workflows.
Parameters: workfile_id (int) – ID of the workfile to download. Returns: file Return type: file Raises: WorkfileNotFoundException – Workfile ID does not exist or is an Alpine workflow. Example:
>>> operator_data = session.workfile.download(workfile_id = 1351)
-
get
(workfile_id)¶ Returns metadata for a workfile.
Parameters: workfile_id (str) – ID of workfile.
Returns: Selected workfile’s metadata.
Return type: dict
Raises: - WorkspaceNotFoundException – The workspace does not exist.
- WorkfileNotFoundException – The workfile does not exist.
Example:
>>> session.workfile.get(workflow_id = 375)
-
get_id
(workfile_name, workspace_id)¶ Returns the ID of a workfile in a workspace.
Parameters: - workfile_name (str) – Name of the workfile.
- workspace_id (int) – ID of the workspace that contains the workfile.
Returns: ID of the workfile.
Return type: int
Raises: - WorkspaceNotFoundException – The workspace does not exist.
- WorkfileNotFoundException – The workfile does not exist.
Example:
>>> session.workspace.get_id(workfile_name = "WineData", workspace_id = "APITests")
-
get_list
(workspace_id, per_page=100)¶ Returns all workfiles in a workspace.
Parameters: - workspace_id (int) – ID of the workspace.
- per_page (int) – Maximum number to fetch with each API call.
Returns: List of workfiles’ metadata.
Return type: list of dict
Raises: WorkspaceNotFoundException – The workspace does not exist.
Example:
>>> session.workfile.get_list(workspace_id = 1672)
-
upload
(workspace_id, afm_file, data_sources_list)¶ Uploads an Alpine workfile file (.afm format). Will alter the workfile to use the data source(s) chosen. Operators within a workflow must remain consistent with type of data source. A workflow built on a Hadoop data source can be converted to use a different Hadoop data source, but not to use a database.
Parameters: - workspace_id (int) – ID of workspace.
- afm_file (str) – Local path to the Alpine workfile (.afm).
- data_sources_list (list) – A list of data source information with the following format: datasource_info = [ {“data_source_type”: DataSource.dsType.HadoopCluster, “data_source_id”: “1”, “database_id”:”“}, {“data_source_type”: DataSource.dsType.JDBCDataSource, “data_source_id”: “421”, “database_id”: “”}, {“data_source_type”: DataSource.dsType.GreenplumDatabase, “data_source_id”: “1”, “database_id”: “42”} ]
Returns: Selected workfile’s metadata.
Return type: dict
Example:
>>> base_dir = os.getcwd() >>> afm_path = "{0}/afm/test.afm".format(base_dir) >>> datasource_info = [{"data_source_type": session.datasource.dsType.GreenplumDatabase, >>> "data_source_id": 1, >>> "database_id": 42}] >>> workfile_info = session.workfile.upload(workspace_id, afm_path, datasource_info)
-
class