Workfile

class alpine.workfile.Workfile(base_url, session, token)

A class for interacting with workfiles. The top-level methods deal with workfile management. The subclass Process can be used to interact with individual workfiles, including running workflows with workflow variables.

class Process(base_url, session, token)

A class for interacting with workfiles.

download_results(workflow_id, process_id)

Downloads a workflow run result.

Parameters:
  • workflow_id (int) – ID of the workflow.
  • process_id (str) – ID of a particular workflow run.
Returns:

JSON object of workflow results.

Return type:

dict

Raises:
  • WorkspaceNotFoundException – The workspace does not exist.
  • WorkfileNotFoundException – The workflow does not exist.
  • ResultsNotFoundException – Results not found or does not match expected structure.

Example:

>>> downloaded_flow_results = session.workfile.process.download_results(workflow_id = 375,
>>>                                                                     process_id = process_id)
static find_operator(operator_name, flow_results)

Helper method to parse a downloaded workflow result to extract data for a single operator.

Parameters:
  • operator_name (str) – Operator name to extract. Must be an exact match to the name in the workflow.
  • flow_results (dict) – JSON object of Alpine flow results from download_results.
Returns:

Single operator data.

Return type:

dict

Raises:

FlowResultsMalformedException – Workflow result does not contain the key [‘outputs’].

Example:

>>> operator_data = session.workfile.process.find_operator(operator_name = 'Row Filter',
>>>                                                        flow_results = downloaded_flow_results)
static get_metadata(flow_results)

Returns the metadata for a particular workflow run including time, number of operators, user, and number of runtime errors.

Parameters:flow_results (dict) – JSON object of Alpine flow results from download_results.
Returns:Run metadata.
Return type:dict
Raises:FlowResultsMalformedException – Workflow result does not contain the key [‘flowMetaInfo’].

Example:

>>> session.workfile.process.get_metadata(flow_results = downloaded_flow_results)
query_status(process_id)

Returns the status of a running workflow.

Parameters:process_id (str) – ID of a particular workflow run.
Returns:State of workflow run. One of ‘WORKING’, ‘FINISHED’, or ‘FAILED’.
Return type:str
Raises:RunFlowFailureException – Process ID not found.

Example:

>>> session.workfile.process.query_status(process_id = process_id)
run(workflow_id, variables=None)

Run a workflow, optionally including a list of workflow variables. Returns a process_id which is needed by other functions which query a run or download results.

Parameters:
  • workflow_id (str) – ID of the workflow.
  • variables (list) – A list of workflow variables with the format: [ {“name”: “wfv_name_1”, “value”: “wfv_value_1”}, {“name”: “wfv_name_2”, “value”: “wfv_value_2”} ]
Returns:

ID for the workflow run process.

Return type:

str

Raises:
  • WorkspaceNotFoundException – The workspace does not exist.
  • WorkfileNotFoundException – The workfile does not exist.

Example:

>>> work_flow_variables = [{"name": "@row_filter", "value": "13"}]
>>> process_id = session.workfile.process.run(workflow_id = 375, variables = work_flow_variables)
stop(process_id)

Attempts to stop a running workflow.

Parameters:process_id (str) – Process ID of the workflow.
Returns:Flow status. One of ‘STOPPED’ or ‘STOP FAILED’.
Return type:str
Raises:StopFlowFailureException – Workflow run not found.

Example:

>>> session.workfile.process.stop(process_id = process_id)
wait_until_finished(workflow_id, process_id, verbose=False, query_time=10, timeout=3600)

Waits for a running workflow to finish.

Parameters:
  • workflow_id (int) – ID of a particular workflow run.
  • process_id (str) – ID of a particular workflow run.
  • verbose (bool) – Optionally print approximate run time.
  • query_time (float) – Number of seconds between status queries. Minimum of 1 second.
  • timeout (float) – Amount of time in seconds to wait for workflow to finish. Will stop if exceeded.
Returns:

Workflow run status.

Return type:

str

Raises:
  • RunFlowTimeoutException – Workflow runtime has exceeded timeout.
  • RunFlowFailureException – Status of FAILURE is detected.

Example:

>>> session.workfile.process.wait_until_finished(workflow_id = workflow_id, process_id = process_id)
delete(workfile_id)

Deletes a workfile from a workspace.

Parameters:

workfile_id (int) – ID of workfile to delete.

Returns:

None.

Return type:

NoneType

Raises:
  • WorkspaceNotFoundException – The workspace does not exist.
  • WorkfileNotFoundException – The workfile does not exist.

Example:

>>> session.workfile.delete(workflow_id = 375)
download(workfile_id)

Download an Alpine workfile. Will not download Alpine workflows.

Parameters:workfile_id (int) – ID of the workfile to download.
Returns:file
Return type:file
Raises:WorkfileNotFoundException – Workfile ID does not exist or is an Alpine workflow.

Example:

>>> operator_data = session.workfile.download(workfile_id = 1351)
get(workfile_id)

Returns metadata for a workfile.

Parameters:

workfile_id (str) – ID of workfile.

Returns:

Selected workfile’s metadata.

Return type:

dict

Raises:
  • WorkspaceNotFoundException – The workspace does not exist.
  • WorkfileNotFoundException – The workfile does not exist.

Example:

>>> session.workfile.get(workflow_id = 375)
get_id(workfile_name, workspace_id)

Returns the ID of a workfile in a workspace.

Parameters:
  • workfile_name (str) – Name of the workfile.
  • workspace_id (int) – ID of the workspace that contains the workfile.
Returns:

ID of the workfile.

Return type:

int

Raises:
  • WorkspaceNotFoundException – The workspace does not exist.
  • WorkfileNotFoundException – The workfile does not exist.

Example:

>>> session.workspace.get_id(workfile_name = "WineData", workspace_id = "APITests")
get_list(workspace_id, per_page=100)

Returns all workfiles in a workspace.

Parameters:
  • workspace_id (int) – ID of the workspace.
  • per_page (int) – Maximum number to fetch with each API call.
Returns:

List of workfiles’ metadata.

Return type:

list of dict

Raises:

WorkspaceNotFoundException – The workspace does not exist.

Example:

>>> session.workfile.get_list(workspace_id = 1672)
upload(workspace_id, afm_file, data_sources_list)

Uploads an Alpine workfile file (.afm format). Will alter the workfile to use the data source(s) chosen. Operators within a workflow must remain consistent with type of data source. A workflow built on a Hadoop data source can be converted to use a different Hadoop data source, but not to use a database.

Parameters:
  • workspace_id (int) – ID of workspace.
  • afm_file (str) – Local path to the Alpine workfile (.afm).
  • data_sources_list (list) – A list of data source information with the following format: datasource_info = [ {“data_source_type”: DataSource.dsType.HadoopCluster, “data_source_id”: “1”, “database_id”:”“}, {“data_source_type”: DataSource.dsType.JDBCDataSource, “data_source_id”: “421”, “database_id”: “”}, {“data_source_type”: DataSource.dsType.GreenplumDatabase, “data_source_id”: “1”, “database_id”: “42”} ]
Returns:

Selected workfile’s metadata.

Return type:

dict

Example:

>>> base_dir = os.getcwd()
>>> afm_path = "{0}/afm/test.afm".format(base_dir)
>>> datasource_info = [{"data_source_type": session.datasource.dsType.GreenplumDatabase,
>>>                     "data_source_id": 1,
>>>                     "database_id": 42}]
>>> workfile_info = session.workfile.upload(workspace_id, afm_path, datasource_info)