Source code for langchain.utilities.bash
"""Wrapper around subprocess to run commands."""
import subprocess
from typing import List, Union
[docs]class BashProcess:
"""Executes bash commands and returns the output."""
def __init__(self, strip_newlines: bool = False, return_err_output: bool = False):
"""Initialize with stripping newlines."""
self.strip_newlines = strip_newlines
self.return_err_output = return_err_output
[docs] def run(self, commands: Union[str, List[str]]) -> str:
"""Run commands and return final output."""
if isinstance(commands, str):
commands = [commands]
commands = ";".join(commands)
try:
output = subprocess.run(
commands,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
).stdout.decode()
except subprocess.CalledProcessError as error:
if self.return_err_output:
return error.stdout.decode()
return str(error)
if self.strip_newlines:
output = output.strip()
return output