'''
Working-point models in the Q-H (flow-head) plane.
This module defines classes used to represent and update operating points of hydraulic systems, especially for pump/circuit analysis and plotting.
Main classes:
* ``Wpoint``: static working point storing fixed ``Q`` and ``H`` values.
* ``WpointDyn``: dynamic working point linked to pump/circuit behavior, recalculating operating conditions when upstream model parameters change.
Typical use cases:
* annotate operating points on Q-H diagrams,
* compute updated intersection points after speed or resistance changes,
* drive interactive plotting workflows where pump/circuit parameters vary.
Typical usage::
wp = Wpoint(name='nominal', Q=5 * u.m**3 / u.h, H=12 * u.m)
dyn = WpointDyn(name='op', pump=pump, circuit=circuit)
dyn.recalc()
print(dyn.Q, dyn.H)
'''
# =============================================================================
# PYLINT DIRECTIVES
# =============================================================================
# =============================================================================
# IMPORTS
# =============================================================================
from typing import Any
import math
import warnings
from scipy.optimize import root
# module own
import fluidsolve.aux_tools as flsa
import fluidsolve.medium as flsm
import fluidsolve.comp_base as flsb
# units
u = flsm.unitRegistry
Quantity = flsm.Quantity # type: ignore[misc]
# =============================================================================
# WORKING POINT CLASSES
# =============================================================================
# =============================================================================
[docs]
class Wpoint ():
''' Static working point in the Q-H plane.
Args:
name (str, optional): Working point label.
Q (int | float | Quantity, optional): Flow rate (default in m3/h).
H (int | float | Quantity, optional): Head (default in m).
'''
[docs]
def __init__(self, **kwargs: int) -> None:
args_in = flsa.GetArgs(kwargs)
self._name: str = args_in.getArg(
'name',
[
flsa.vFun.default(''),
flsa.vFun.istype((str)),
]
)
self._Q: Quantity = args_in.getArg(
'Q',
[
flsa.vFun.default(0.0 * u.m**3/u.h),
flsa.vFun.istype((int, float, Quantity)),
flsa.vFun.tounits(u.m**3/u.h),
]
)
self._H: Quantity = args_in.getArg(
'H',
[
flsa.vFun.default(0.0 * u.m),
flsa.vFun.istype((int, float, Quantity)),
flsa.vFun.tounits(u.m),
]
)
@property
def name(self) -> str:
''' Name of the working point. '''
return self._name
@name.setter
def name(self, value: str) -> None:
''' Set name property.
Args:
value (str): Name.
'''
self._name = value
@property
def Q(self) -> Quantity:
''' Flow rate (in m3/h). '''
return self._Q.to(u.m**3/u.h)
@Q.setter
def Q(self, value: int | float | Quantity) -> None:
''' Set flow property.
Args:
value (int | float | Quantity): Flow (default in m3/h).
'''
self._Q = flsa.toUnits(value, u.m**3/u.h)
@property
def H(self) -> Quantity:
''' Head (in m). '''
return self._H.to(u.m)
@H.setter
def H(self, value: int | float | Quantity) -> None:
''' Set head property.
Args:
value (int | float | Quantity): Head (default in m).
'''
self._H = flsa.toUnits(value, u.m)
@property
def Qmag(self) -> float:
''' Flow rate magnitude (in m3/h). '''
return self._Q.to(u.m**3/u.h).magnitude
@property
def Hmag(self) -> float:
''' Head magnitude (in m). '''
return self._H.to(u.m).magnitude
[docs]
def update(self) -> 'Wpoint':
''' Update Q and H. In this base class returns self unchanged.
Returns:
Wpoint: self.
'''
return self
[docs]
def toString(self, detail: int=0) -> str: # pylint: disable=unused-argument
if self._name=='':
return f'Pt: Q: {self._Q.to(u.m**3/u.h):.2f~P}, H: {self.H.to(u.m):.2f~P}'
else:
return f'Pt {self._name}: Q: {self._Q.to(u.m**3/u.h):.2f~P}, H: {self.H.to(u.m):.2f~P}'
[docs]
def __str__(self) -> str:
return self.toString(0)
def __format__(self, format_spec: str) -> str:
if format_spec == '':
return str(self)
try:
detail = int(format_spec)
except ValueError as exc:
raise ValueError(f'Invalid format spec for {type(self).__name__}: {format_spec!r}') from exc
return self.toString(detail)
[docs]
def __repr__(self) -> str:
if self._name=='':
return f'Pt: Q: {self._Q.to(u.m**3/u.h):.2f~P}, H: {self.H.to(u.m):.2f~P}'
else:
return f'Pt {self._name}: Q: {self._Q.to(u.m**3/u.h):.2f~P}, H: {self.H.to(u.m):.2f~P}'
# =============================================================================
[docs]
class WpointDyn (Wpoint):
''' Dynamic working point that recalculates Q and H from two components.
Args:
s1 (Comp_Base, optional): First component (e.g. pump curve).
s2 (Comp_Base, optional): Second component (e.g. system curve).
guess (int | float | list, optional): Initial flow guess for the solver.
name (str, optional): Working point label.
Q (int | float | Quantity, optional): Initial flow rate (default in m3/h).
H (int | float | Quantity, optional): Initial head (default in m).
'''
[docs]
def __init__(self, **kwargs: int) -> None:
args_in: dict = flsa.GetArgs(kwargs)
self._s1: Any = args_in.getArg(
's1',
[
flsa.vFun.default(None),
flsa.vFun.istype(flsb.Comp_Base),
]
)
self._s2: Any = args_in.getArg(
's2',
[
flsa.vFun.default(None),
flsa.vFun.istype(flsb.Comp_Base),
]
)
self._guess : int | float | list =args_in.getArg(
'guess',
[
flsa.vFun.default(200),
flsa.vFun.istype(int, float, list),
]
)
# base class init
super().__init__(**args_in.restArgs())
# some calculations
self.update()
[docs]
def update(self) -> 'WpointDyn':
''' Recalculate Q and H from the two components.
Returns:
WpointDyn: self.
'''
if self._s1 is not None and self._s2 is not None:
self._Q, self._H = calcOperatingPoint(self._s1, self._s2, self._guess)
return self
# =============================================================================
# SOLVERS
# =============================================================================
# =============================================================================
[docs]
def calcOperatingPoint(c1: Any, c2:Any, guess: Any=None) -> tuple:
''' Calculate the operating point for two intersecting curves.
Typically a pump curve and a system curve; each component may itself
be a composite (e.g. Comp_Serial).
Args:
c1 (Comp_Base): First component.
c2 (Comp_Base): Second component.
guess (int | float | list, optional): Initial flow guess for the solver.
Returns:
tuple[Quantity, Quantity]: Operating point (Q, H).
'''
def F(Q: int | float) -> Any:
#return (abs(c1.calcH(Q, 1)) - abs(c2.calcH(Q, 1))).magnitude # this is not ok, you resume there is a sign change, we need to keep the sign to find the correct root
return (c1.calcH(Q, 1) + c2.calcH(Q, 1)).magnitude
if guess is None:
x0_list = [0.5, 2.0, 10.0, 50.0, 200.0]
elif isinstance(guess, (list, tuple)):
x0_list = [float(item) for item in guess]
else:
x0_list = [float(guess)]
last_msg = f'calcOperatingPoint({c1.name}, {c2.name}) did not converge.'
methods = ('hybr', 'lm', 'df-sane')
# Try each method for each initial guess until a finite solution is found.
for x0 in x0_list:
for method in methods:
root_res = root(F, x0=x0, method=method)
if not root_res.success:
msg = str(root_res.message)
last_msg = f'calcOperatingPoint({c1.name}, {c2.name}): {msg}' if msg else f'calcOperatingPoint({c1.name}, {c2.name}): root[{method}] failed with status={root_res.status}'
continue
x_val = root_res.x
if hasattr(x_val, 'flat'):
q_mag = float(x_val.flat[0])
elif isinstance(x_val, (list, tuple)):
q_mag = float(x_val[0])
else:
q_mag = float(x_val)
if not math.isfinite(q_mag) or q_mag < 0.0:
last_msg = f'calcOperatingPoint({c1.name}, {c2.name}): invalid root Q={q_mag}'
continue
break
else:
continue
break
else:
warnings.warn(last_msg, RuntimeWarning, stacklevel=2)
return (0.0 * u.m**3 / u.h, 0.0 * u.m)
# process result
Q = q_mag * u.m**3/u.h
H = abs(c2.calcH(Q, 1))
return (Q, H)