# Copyright 2020 Virginia Polytechnic Institute and State University.
""" OpenFOAM I/O.
These functions are accesible from ``dafi.random_field.foam``.
"""
# standard library imports
import os
import shutil
import re
import subprocess
# third party imports
import numpy as np
# global variables
NDIM = {'scalar': 1,
'vector': 3,
'symmTensor': 6,
'tensor': 9}
# get mesh properties by running OpenFOAM shell utilities
def _bash_source_of(file=None):
""" System-specific file to source OpenFOAM on the subprocess call.
Most installation on linux should not require this.
Parameters
----------
file : str
Name (path) of file that sources OpenFOAM.
Returns
-------
bash_command : str
Shell command to source OpenFOAM.
"""
if file == None:
bash_command = ""
else:
bash_command = f"source {file} > /dev/null 2>&1; "
return bash_command
def _check0(foam_case):
""" Create OpenFOAM 0 directory if it does not exist.
Copies the 0.orig folder if the 0 folder does not exist.
Parameters
----------
foam_case : str
Name (path) of OF case directory.
Returns
-------
dirCreated : bool
Whether the 0 directory was created. If it exists already
returns *'False'*.
"""
dst = os.path.join(foam_case, '0')
dirCreated = False
if not os.path.isdir(dst):
src = os.path.join(foam_case, '0.orig')
shutil.copytree(src, dst)
dirCreated = True
return dirCreated
def _checkMesh(foam_case, foam_rc=None):
""" Create OpenFOAM mesh if it does not exist.
Requires OpenFOAM to be sourced. Calls ``blockMesh`` utility.
Parameters
----------
foam_case : str
Name (path) of OF case directory.
foam_rc : str
File for sourcing OpenFOAM.
Returns
-------
meshCreated : bool
Whether the mesh was created. If mesh existed already returns
*`False`*.
"""
meshdir = os.path.join(foam_case, 'constant', 'polyMesh')
meshCreated = False
if not os.path.isdir(meshdir):
bash_command = _bash_source_of(foam_rc)
bash_command += "blockMesh -case " + foam_case
subprocess.call(bash_command, shell=True)
meshCreated = True
return meshCreated
[docs]def get_number_cells(foam_case='.', foam_rc=None):
""" Get the number of cells in an OpenFOAM case.
Requires OpenFOAM to be sourced, since it calls the ``checkMesh``
utility.
Parameters
----------
foam_case : str
Name (path) of OF case directory.
foam_rc : str
File for sourcing OpenFOAM.
Returns
-------
ncells : int
Number of cells.
"""
bash_command = _bash_source_of(foam_rc)
bash_command += "checkMesh -case " + foam_case + \
" -time '0' | grep ' cells:'"
cells = subprocess.check_output(bash_command, shell=True)
cells = cells.decode("utf-8").replace('\n', '').split(':')[1].strip()
return int(cells)
[docs]def get_cell_centres(foam_case='.', group='internalField', keep_file=False,
foam_rc=None):
""" Get the coordinates of cell centers in an OpenFOAM case.
Requires OpenFOAM to be sourced.
Parameters
----------
foam_case : str
Name (path) of OF case directory.
keep_file : bool
Whether to keep the file (C) generated by the OpenFOAM
post-processing utility.
foam_rc : str
File for sourcing OpenFOAM.
Returns
-------
coords : ndarray
Cell center coordinates (x, y, z).
*dtype=float*, *ndim=2*, *shape=(ncells, 3)*
"""
timedir = '0'
del0 = _check0(foam_case)
delMesh = _checkMesh(foam_case)
bash_command = _bash_source_of(foam_rc)
bash_command += "postProcess -func writeCellCentres " + \
"-case " + foam_case + f" -time '{timedir}' " + "> /dev/null"
subprocess.call(bash_command, shell=True)
os.remove(os.path.join(foam_case, timedir, 'Cx'))
os.remove(os.path.join(foam_case, timedir, 'Cy'))
os.remove(os.path.join(foam_case, timedir, 'Cz'))
file = os.path.join(foam_case, timedir, 'C')
coords = read_cell_centres(file, group=group)
if not keep_file:
os.remove(file)
if del0:
shutil.rmtree(os.path.join(foam_case, timedir))
if delMesh:
shutil.rmtree(os.path.join(foam_case, 'constant', 'polyMesh'))
return coords
[docs]def get_cell_volumes(foam_case='.', keep_file=False, foam_rc=None):
""" Get the volume of each cell in an OpenFOAM case.
Requires OpenFOAM to be sourced.
Parameters
----------
foam_case : str
Name (path) of OF case directory.
keep_file : bool
Whether to keep the file (V) generated by the OpenFOAM
post-processing utility.
foam_rc : str
File for sourcing OpenFOAM.
Returns
-------
vol : ndarray
Cell volumes.
*dtype=float*, *ndim=1*, *shape=(ncells)*
"""
timedir = '0'
del0 = _check0(foam_case)
delMesh = _checkMesh(foam_case)
bash_command = _bash_source_of(foam_rc)
bash_command += "postProcess -func writeCellVolumes " + \
"-case " + foam_case + f" -time {timedir} " + "> /dev/null"
subprocess.call(bash_command, shell=True)
file = os.path.join(foam_case, timedir, 'V')
vol = read_cell_volumes(file)
if not keep_file:
os.remove(file)
if del0:
shutil.rmtree(os.path.join(foam_case, timedir))
if delMesh:
shutil.rmtree(os.path.join(foam_case, 'constant', 'polyMesh'))
return vol
[docs]def get_neighbors(foam_case='.'):
""" Get the neighbors of each cell (connectivity).
Parameters
----------
foam_case : str
Name (path) of OF case directory.
Returns
-------
connectivity : dictionary
The keys are cell's index and the values are a list of indices
for the cells that neighbor it.
"""
# create mesh if needed
timedir = '0'
del0 = _check0(foam_case)
delMesh = _checkMesh(foam_case)
ncells = get_number_cells(foam_case)
# read mesh files
mesh_dir = os.path.join(foam_case, 'constant', 'polyMesh')
owner = read_scalar_field(os.path.join(mesh_dir, 'owner'), ' ')
neighbour = read_scalar_field(os.path.join(mesh_dir, 'neighbour'), ' ')
# keep internal faces only
nintfaces = len(neighbour)
owner = owner[:nintfaces]
#
connectivity = {cellid: [] for cellid in range(ncells)}
for iowner, ineighbour in zip(owner, neighbour):
connectivity[int(iowner)].append(int(ineighbour))
connectivity[int(ineighbour)].append(int(iowner))
if del0:
shutil.rmtree(os.path.join(foam_case, timedir))
if delMesh:
shutil.rmtree(os.path.join(foam_case, 'constant', 'polyMesh'))
return connectivity
# read fields
[docs]def read_field(file, ndim, group='internalField'):
""" Read the field values from an OpenFOAM field file.
Can read either the internal field or a specified boundary.
Parameters
----------
file : str
Name of OpenFOAM field file.
ndim : int
Field dimension (e.g. 1 for scalar field).
group : str
Name of the group to read: *'internalField'* or name of specific
boundary.
Returns
-------
data : ndarray
Field values on specified group.
*dtype=float*, *ndim=2*, *shape=(ncells, ndim)*
"""
# read file
with open(file, 'r') as f:
content = f.read()
# keep file portion after specified group
content = content.partition(group)[2]
# data structure
whole_number = r"([+-]?[\d]+)"
decimal = r"([\.][\d]*)"
exponential = r"([Ee][+-]?[\d]+)"
floatn = f"{whole_number}{{1}}{decimal}?{exponential}?"
if ndim == 1:
data_structure = f"({floatn}\\n)+"
else:
data_structure = r'(\(' + f"({floatn}" + r"(\ ))" + \
f"{{{ndim-1}}}{floatn}" + r"\)\n)+"
# extract data
pattern = r'\(\n' + data_structure + r'\)'
data_str = re.compile(pattern).search(content).group()
# convert to numpy array
data_str = data_str.replace('(', '').replace(')', '').replace('\n', ' ')
data_str = data_str.strip()
data = np.fromstring(data_str, dtype=float, sep=' ')
if ndim > 1:
data = data.reshape([-1, ndim])
return data
[docs]def read_scalar_field(file, group='internalField'):
""" Read an OpenFOAM scalar field file.
See :py:meth:`read_field` for more information.
"""
return read_field(file, NDIM['scalar'], group=group)
[docs]def read_vector_field(file, group='internalField'):
""" Read an OpenFOAM vector field file.
See :py:meth:`read_field` for more information.
"""
return read_field(file, NDIM['vector'], group=group)
[docs]def read_symmTensor_field(file, group='internalField'):
""" Read an OpenFOAM symmTensor field file.
See :py:meth:`read_field` for more information.
"""
return read_field(file, NDIM['symmTensor'], group=group)
[docs]def read_tensor_field(file, group='internalField'):
""" Read an OpenFOAM tensor field file.
See :py:meth:`read_field` for more information.
"""
return read_field(file, NDIM['tensor'], group=group)
[docs]def read_cell_centres(file='C', group='internalField'):
""" Read an OpenFOAM mesh coordinate file.
See :py:meth:`read_field` for more information.
"""
return read_vector_field(file, group=group)
[docs]def read_cell_volumes(file='V'):
""" Read an OpenFOAM mesh volume file.
See :py:meth:`read_field` for more information.
"""
return read_scalar_field(file, group='internalField')
# read entire field file
def _read_logo(content):
""" Read info from logo in file header. """
def _read_logo(pat):
pattern = pat + r":\s+\S+"
data_str = re.compile(pattern).search(content).group()
return data_str.split(':')[1].strip()
info = {}
for pat in ['Version', 'Website']:
info[pat] = _read_logo(pat)
return info
def _read_header_info(content):
""" Read info from info section in file header. """
def _read_header(pat):
pattern = pat + r"\s+\S+;"
data_str = re.compile(pattern).search(content).group()
return data_str.split(pat)[1][:-1].strip()
info = {}
foam_class = _read_header('class').split('Field')[0].split('vol')[1]
info['foam_class'] = foam_class[0].lower() + foam_class[1:]
info['name'] = _read_header('object')
try:
info['location'] = _read_header('location')
except AttributeError:
info['location'] = None
return info
[docs]def read_field_file(file):
""" Read a complete OpenFOAM field file.
This includes header information not just the field values.
The output can be directly used to write the file again, e.g.
.. code-block:: python
>>> content = read_field_file(file)
>>> write_field_file(**content).
Parameters
----------
file : str
Name (path) of OpenFOAM field file.
Returns
-------
info : dictionary
The contents of the file organized with the same structure as
the inputs to the :py:meth:`write_field_file` method. See
:py:meth:`write_field_file` for more information.
"""
with open(file, 'r') as f:
content = f.read()
info = {}
info['file'] = file
# read logo
logo_info = _read_logo(content)
info['foam_version'] = logo_info['Version']
info['website'] = logo_info['Website']
# read header
header_info = _read_header_info(content)
info['foam_class'] = header_info['foam_class']
info['name'] = header_info['name']
info['location'] = header_info['location']
# dimension
pattern = r"dimensions\s+.+"
data_str = re.compile(pattern).search(content).group()
info['dimensions'] = data_str.split('dimensions')[1][:-1].strip()
# internalField: uniform/nonuniform
internal = {}
pattern = r'internalField\s+\S+\s+.+'
data_str = re.compile(pattern).search(content).group()
if data_str.split()[1] == 'uniform':
internal['uniform'] = True
tmp = data_str.split('uniform')[1].strip()[:-1]
tmp = tmp.replace('(', '').replace(')', '').split()
internal['value'] = np.array([float(i) for i in tmp])
else:
internal['uniform'] = False
internal['value'] = read_field(file, NDIM[info['foam_class']])
info['internal_field'] = internal
# boundaries: type and value(optional)
# value can be uniform/nonuniform scalar/(multi)
boundaries = []
bcontent = content.split('boundaryField')[1].strip()[1:].strip()
pattern = r'\w+' + r'[\s\n]*' + r'\{' + r'[\w\s\n\(\);\.\<\>\-+]+' + r'\}'
boundaries_raw = re.compile(pattern).findall(bcontent)
for bc in boundaries_raw:
ibc = {}
# name
pattern = r'[\w\s\n]+' + r'\{'
name = re.compile(pattern).search(bc).group()
name = name.replace('{', '').strip()
ibc['name'] = name
# type
pattern = r'type\s+\w+;'
type = re.compile(pattern).search(bc).group()
type = type.split('type')[1].replace(';', '').strip()
ibc['type'] = type
# value
if 'value' in bc:
value = {}
v = bc.split('value')[1]
if v.split()[0] == 'uniform':
value['uniform'] = True
v = v.split('uniform')[1]
tmp = v.replace('}', '').replace(';', '').strip()
tmp = tmp.replace('(', '').replace(')', '').split()
if len(tmp) == 1:
value['data'] = float(tmp[0])
else:
value['data'] = np.array([float(i) for i in tmp])
else:
value['uniform'] = False
value['data'] = read_field(
file, NDIM[info['foam_class']], group=ibc['name'])
else:
value = None
ibc['value'] = value
boundaries.append(ibc)
info['boundaries'] = boundaries
return info
def read_controlDict():
# TODO: Implement.
# read header logo
# read header info
# read content
raise NotImplementedError()
# write fields
[docs]def foam_sep():
""" Write a separation comment line.
Used by :py:meth:`write_field_file`.
Returns
-------
sep : str
Separation comment string
"""
return '\n// ' + '* '*37 + '//'
[docs]def write_controlDict(content, foam_version, website, ofcase=None):
"""
Parameters
----------
content : dict
Content of controlDict.
foam_version : str
OpenFOAM version to write in the header.
website : str
OpenFOAM website to write in the header.
ofcase : str
OpenFOAM directory. File will be written to
<ofcase>/system/controlDict. If *None* file will be written in
current directory.
Returns
-------
file_loc : str
Location (absolute path) of file written.
file_content : str
Content written to file.
"""
# create content
file_str = foam_header_logo(foam_version, website) + '\n'
file_str += foam_header_info('controlDict', 'dictionary', 'system', False)
file_str += '\n' + foam_sep() + '\n'
for key, val in content.items():
file_str += f'\n{key:<16} {val};'
file_str += '\n' + foam_sep()
# write
if ofcase is None:
file = 'controlDict'
else:
file = os.path.join(ofcase, 'system', 'controlDict')
with open(file, 'w') as f:
f.write(file_str)
return os.path.abspath(file), file_str
[docs]def write_field_file(name, foam_class, dimensions, internal_field,
boundaries, foam_version, website, location=None,
file=None):
""" Write an OpenFOAM field file.
Parameters
----------
name : str
Field name (e.g. 'p').
foam_class : str
OpenFOAM class (e.g. 'scalar').
dimensions : str or list
Field dimensions in SI units using OpenFOAM convention
(e.g. '[0 2 -2 0 0 0 0]').
Alternatively can be a list of 7 or 3 integers. If three (MLT)
zeros will be appended at the end, i.e. [M L T 0 0 0 0].
internal_field : dictionary
Dictionary containing internal field information. See note below
for more information.
boundaries : list
List containing one dictionary per boundary. Each dictionary
contains the required information on that boundary. See note
below for more information.
foam_version : str
OpenFOAM version to write in the header.
website : str
OpenFOAM website to write in the header.
location : str
File location (optional).
file : str
File name (path) where to write field. If *'None'* will write in
current directory using the field name as the file name.
Returns
-------
file_loc : str
Location (absolute path) of file written.
file_content : str
Content written to file.
Note
----
**internal_field**
The ``internal_field`` dictionary must have the following
entries:
* **uniform** - *bool*
Whether the internal field has uniform or nonuniform
value.
* **value** - *float* or *ndarray*
The uniform or nonuniform values of the internal field.
**boundaries**
Each boundary dictionary in the ``boundaries`` list must have
the following entries:
* **name** - *str*
Boundary name.
* **type** - *str*
Boundary type.
* **value** - *dict* (optional)
Dictionary with same entries as the *'internal_field'*
dictionary.
"""
def _foam_field(uniform, value, foamclass=None):
def _list_to_foamstr(inlist):
outstr = ''
for l in inlist:
outstr += f'{l} '
return outstr.strip()
def _foam_nonuniform(data):
field = f'{len(data)}\n('
if data.ndim == 1:
for d in data:
field += f'\n{d}'
elif data.ndim == 2:
for d in data:
field += f'\n({_list_to_foamstr(d)})'
else:
raise ValueError('"data" cannot have more than 2 dimensions.')
field += '\n)'
return field
if uniform:
# list type
if isinstance(value, (list, np.ndarray)):
if isinstance(value, np.ndarray):
# value = np.atleast_1d(np.squeeze(value))
value = np.squeeze(value)
if value.ndim != 1:
err_msg = 'Uniform data should have one dimension.'
raise ValueError(err_msg)
value = f'({_list_to_foamstr(value)})'
field = f'uniform {value}'
else:
if foamclass is None:
raise ValueError('foamclass required for nonuniform data.')
value = np.squeeze(value)
field = f'nonuniform List<{foamclass}>'
field += '\n' + _foam_nonuniform(value)
return field
def _foam_dimensions(dimensions):
if isinstance(dimensions, list):
if len(dimensions) == 3:
dimensions = dimensions.append([0, 0, 0, 0])
elif len(dimensions) != 7:
raise ValueError('Dimensions must be length 3 or 7.')
str = ''
for idim in dimensions:
str += f'{idim} '
dimensions = f'[{str.strip()}]'
return dimensions
# create string
file_str = foam_header_logo(foam_version, website)
file_str += '\n' + foam_header_info(name, foam_class, location)
file_str += '\n' + foam_sep()
file_str += '\n'*2 + f'dimensions {_foam_dimensions(dimensions)};'
file_str += '\n'*2 + 'internalField '
file_str += _foam_field(
internal_field['uniform'], internal_field['value'], foam_class) + ';'
file_str += '\n\nboundaryField\n{'
for bc in boundaries:
file_str += '\n' + ' '*4 + bc["name"] + '\n' + ' '*4 + '{'
file_str += '\n' + ' '*8 + 'type' + ' '*12 + bc["type"] + ';'
write_value = False
if 'value' in bc:
if bc['value'] is not None:
write_value = True
if write_value:
data = _foam_field(
bc['value']['uniform'], bc['value']['data'], foam_class)
file_str += '\n' + ' '*8 + 'value' + ' '*11 + data + ';'
file_str += '\n' + ' '*4 + '}'
file_str += '\n}\n' + foam_sep()
# write to file
if file is None:
file = name
with open(file, 'w') as f:
f.write(file_str)
return os.path.abspath(file), file_str
[docs]def write(version, fieldname, internal_field, boundaries, location=None,
file=None):
""" Write an OpenFOAM field file for one of the pre-specified fields.
The implemented fields are: 'p', 'k', 'epsilon', 'omega', 'nut',
'Cx', 'Cy', 'Cz', 'V', 'U', 'C', 'Tau', 'grad(U)'.
This calls :py:meth:`write_field_file` but requires less information.
Parameters
----------
version : str
OpenFOAM version. Must be length 1 (e.g. '7') or 4 (e.g. '1912').
fieldname : str
One of the pre-defined fields.
internal_field : dictionary
See :py:meth:`write_field_file`.
boundaries : list
See :py:meth:`write_field_file`.
location : str
See :py:meth:`write_field_file`.
file : str
See :py:meth:`write_field_file`.
"""
def field_info(fieldname):
def get_foam_class(fieldname):
scalarlist = ['p', 'k', 'epsilon', 'omega', 'nut', 'Cx', 'Cy',
'Cz', 'V']
if fieldname in scalarlist:
foam_class = 'scalar'
elif fieldname in ['U', 'C']:
foam_class = 'vector'
elif fieldname == 'Tau':
foam_class = 'symmTensor'
elif fieldname == 'grad(U)':
foam_class = 'tensor'
return foam_class
def get_dimensions(fieldname):
# 1 - Mass (kg)
# 2 - Length (m)
# 3 - Time (s)
# 4 - Temperature (K)
# 5 - Quantity (mol)
# 6 - Current (A)
# 7 - Luminous intensity (cd)
if fieldname == 'U':
dimensions = '[ 0 1 -1 0 0 0 0]'
elif fieldname in ['p', 'k', 'Tau']:
dimensions = '[0 2 -2 0 0 0 0]'
elif fieldname == 'phi':
dimensions = '[0 3 -1 0 0 0 0]'
elif fieldname == 'epsilon':
dimensions = '[0 2 -3 0 0 0 0]'
elif fieldname in ['omega', 'grad(U)']:
dimensions = '[0 0 -1 0 0 0 0]'
elif fieldname == 'nut':
dimensions = '[0 2 -1 0 0 0 0]'
elif fieldname in ['C', 'Cx', 'Cy', 'Cz']:
dimensions = '[0 1 0 0 0 0 0]'
elif fieldname == 'V':
dimensions = '[0 3 0 0 0 0 0]'
return dimensions
field = {'name': fieldname,
'class': get_foam_class(fieldname),
'dimensions': get_dimensions(fieldname)}
return field
def version_info(version):
# string ('7' or '1912')
website = 'www.openfoam.'
if len(version) == 1:
version += '.x'
website += 'org'
elif len(version) == 4:
version = 'v' + version
website += 'com'
foam = {'version': version,
'website': website}
return foam
foam = version_info(version)
field = field_info(fieldname)
file_path, file_str = write_field_file(
foam_version=foam['version'],
website=foam['website'],
name=field['name'],
foam_class=field['class'],
dimensions=field['dimensions'],
internal_field=internal_field,
boundaries=boundaries,
location=location,
file=file)
return file_path, file_str
[docs]def write_p(version, internal, boundaries, location=None, file=None):
""" Write a pressure field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'p', internal, boundaries, location, file)
[docs]def write_U(version, internal, boundaries, location=None, file=None):
""" Write a velocity field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'U', internal, boundaries, location, file)
[docs]def write_Tau(version, internal, boundaries, location=None, file=None):
""" Write a Reynolds stress field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'Tau', internal, boundaries, location, file)
[docs]def write_nut(version, internal, boundaries, location=None, file=None):
""" Write an eddy viscosity field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'nut', internal, boundaries, location, file)
[docs]def write_k(version, internal, boundaries, location=None, file=None):
""" Write a turbulent kinetic energy (TKE) field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'k', internal, boundaries, location, file)
[docs]def write_epsilon(version, internal, boundaries, location=None, file=None):
""" Write a TKE dissipation rate field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'epsilon', internal, boundaries, location, file)
[docs]def write_omega(version, internal, boundaries, location=None, file=None):
""" Write a TKE specific dissipation rate field file.
See :func:`~dafi.random_field.foam_utilities.write` for more
information.
"""
return write(version, 'omega', internal, boundaries, location, file)