'''
Pump component models and Q-H curve handling.
This module implements hydraulic pump components that act as energy sources
in the network solver. Pump behavior is primarily defined through Q-H curve
data and operating speed scaling.
Main features:
* generic pump base class with shared pump metadata,
* curve-driven head evaluation over flow,
* speed-dependent behavior through rated vs operating speed,
* support for catalogue-driven centrifugal pump definitions.
Design conventions:
* pumps add energy (``sign = +1``),
* mounting direction is port 1 -> port 2,
* flow direction is provided through ``sense``,
* ``sense = +1`` follows mounting direction,
* ``sense = -1`` represents reverse-flow use (no useful pump head),
* pump curves are interpreted as positive-head source data.
Implementation notes:
* curve interpolation is used to evaluate head at intermediate flow values,
* flow/head units are normalized with the shared unit registry,
* curve sampling helpers are available for plotting and working-point tools.
Typical usage::
pump = Comp_PumpCentrifugal(
dataQH=[0, 20, 10, 18, 20, 12],
speed0=2900,
speed=2600,
)
H = pump.calcH(8 * u.m**3 / u.h, sense=1)
These pump classes are typically instantiated via the core factory helpers,
which provide consistent naming and defaults across a full hydraulic model.
'''
# =============================================================================
# PYLINT DIRECTIVES
# =============================================================================
# =============================================================================
# IMPORTS
# =============================================================================
from typing import Callable, List, Any
import numpy as np
from scipy.interpolate import interp1d
# module own
import fluidsolve.aux_tools as flsa
import fluidsolve.medium as flsme
import fluidsolve.comp_base as flsb
# units
u = flsme.unitRegistry
Quantity = flsme.Quantity # type: ignore[misc]
# =============================================================================
# CONSTANTS
# =============================================================================
N_CURVE_POINTS = 100
# =============================================================================
# GENERIC PUMP
# =============================================================================
[docs]
class Comp_Pump(flsb.Comp_Base): # pylint: disable=invalid-name
''' Generic pump component class.
Args:
vendor (str, optional): Vendor name.
spec (str, optional): Pump specification or type.
din (int | float | Quantity, optional): Pump inlet diameter (mm).
dout (int | float | Quantity, optional): Pump outlet diameter (mm).
speed0 (int | float | Quantity): Pump rated speed (in rpm).
speed (int | float | Quantity, optional): Pump operating speed (rpm).
hasdata (bool, optional): Whether Q-H curve data is required.
dataQH (list, optional): Q-H curve data as
[Q0, H0, Q1, H1, ... , Qn, Hn].
Returns:
None
'''
# --------------------------------------------------------------------------
# FIXED PROPERTIES
_group : str = 'Pump'
_part : str = 'Generic'
_prefix : str = 'P'
_sign : float = +1.0
# --------------------------------------------------------------------------
# INITIALIZE
[docs]
def __init__(self, **kwargs: Any) -> Any:
# arguments
args_in = flsa.GetArgs(kwargs)
self._vendor = args_in.getArg(
'vendor',
[flsa.vFun.default('undefined'),
flsa.vFun.istype(str)]
)
self._spec = args_in.getArg(
'spec',
[flsa.vFun.default('undefined'),
flsa.vFun.istype(str)]
)
self._din: str = args_in.getArg(
'din',
[
flsa.vFun.default(0.0 * u.m),
flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.mm),
]
)
self._dout: str = args_in.getArg(
'dout',
[
flsa.vFun.default(0.0 * u.m),
flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.mm),
]
)
self._speed0 = args_in.getArg(
'speed0',
[flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.rpm)]
)
self._speed = args_in.getArg(
'speed',
[flsa.vFun.default(self._speed0),
flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.rpm)]
)
# curve
self._Qb : Quantity = None
self._Qe : Quantity = None
self._Qc : Quantity = None
self._Hb : Quantity = None
self._He : Quantity = None
self._dataQ0 : list = None
self._dataH0 : list = None
self._dataQ : list = None
self._dataH : list = None
self._funQtoH : Callable = None
self._funHtoQ : Callable = None
#
hasdata = args_in.getArg(
'hasdata',
[flsa.vFun.default(True),
flsa.vFun.istype(bool)]
)
dataQH = args_in.getArg(
'dataQH',
[flsa.vFun.default([]),
flsa.vFun.istype(list)]
)
if hasdata:
if not dataQH:
raise ValueError(f'No pump data (dataQH: {dataQH})')
arr = np.reshape(dataQH, (-1, 2))
self._dataQ0 = arr[:, 0]
self._dataH0 = arr[:, 1]
# base class init
super().__init__(**args_in.restArgs())
# some calculations
self.updateCurve()
# --------------------------------------------------------------------------
# PROPERTIES
@property
def vendor(self) -> str:
''' Pump vendor.
Returns:
str: Pump vendor.
'''
return self._vendor
@property
def spec(self) -> str:
''' Pump specification (for example model or type number).
Returns:
str: Pump specification.
'''
return self._spec
@property
def din(self) -> Quantity:
''' Pump inlet diameter (mm).
Returns:
Quantity: Pump inlet diameter.
'''
if self._din is None:
return None
else:
return self._din.to(u.mm)
@property
def dout(self) -> Quantity:
''' Pump outlet diameter (mm).
Returns:
Quantity: Pump outlet diameter.
'''
if self._dout is None:
return None
else:
return self._dout.to(u.mm)
@property
def speed0(self) -> Quantity:
''' Pump rated speed (rpm).
Returns:
Quantity: Pump rated speed.
'''
return self._speed0
@property
def speed(self) -> int | float:
''' Pump operating speed (rpm).
Returns:
int | float: Pump operating speed.
'''
return self._speed
@speed.setter
def speed(self, value: int | float | Quantity) -> None:
''' Set pump operating speed.
Args:
value (int | float | Quantity): Operating speed (rpm).
'''
self._speed = flsa.toUnits(value, u.rpm)
self.updateCurve()
@property
def Qb(self) -> Quantity:
''' Minimum flow rate (m3/h).
Returns:
Quantity: Flow rate.
'''
return self._Qb
@property
def Qe(self) -> Quantity:
''' Maximum flow rate (m3/h).
Returns:
Quantity: Flow rate.
'''
return self._Qe
@property
def Qc(self) -> Quantity:
''' Critical flow rate (m3/h).
Returns:
Quantity: Critical flow rate.
'''
return self._Qc
@property
def Hb(self) -> Quantity:
''' Lower curve-bound marker.
Returns:
Quantity: Lower bound marker.
'''
return self._Hb
@property
def He(self) -> Quantity:
''' Upper curve-bound marker.
Returns:
Quantity: Upper bound marker.
'''
return self._He
@property
def dataH(self) -> list:
''' curve H data.
Returns:
list: curve H data.
'''
return self._dataH
@property
def dataQ(self) -> list:
''' curve Q data.
Returns:
list: curve Q data.
'''
return self._dataQ
# --------------------------------------------------------------------------
# PHYSICS
[docs]
def calcH(self, Q: Quantity, sense: int=1, pin: int=1, pout:int=2) -> Quantity:
''' Calculate pump head.
Reverse flow produces no head.
Args:
Q (int | float | Quantity): Flow rate (default unit: m3/h).
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Pump head (m).
'''
lQ = flsa.toUnits(Q, u.m**3/u.h).magnitude
lsense = sense if pin < pout else -sense
# Effective flow sign in the pump's own 1->2 mounting direction.
flow_sign = np.sign(lQ) * lsense
H = self._funQtoH(np.abs(lQ))
#print(f'pump---{self._name}-------------------', lQ, sense, pin, pout, flow_sign, H)
if isinstance(H, np.ndarray):
H = np.asarray(H, dtype=float)
H[flow_sign < 0] = 0
H[H <= 0] = 0
else:
if flow_sign < 0:
H = 0
else:
H = max(0, H)
return H * u.m
[docs]
def calcQ(self, H: Quantity, guess: Any=200, sense: int=1, pin: int=1, pout:int=2) -> Quantity: # pylint: disable=unused-argument
''' Calculate flow rate from head.
Args:
H (int | float | Quantity): Head (default unit: m).
guess (Any): Unused in pump inverse curve lookup; kept for API compatibility.
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Flow rate (in m3/h).
'''
lH = flsa.toUnits(H, u.m)
Q = self._funHtoQ(lH.magnitude)
if isinstance(Q, np.ndarray):
Q[Q <= 0] = 0
else:
Q = max(0, Q)
return Q * u.m**3/u.h
# --------------------------------------------------------------------------
# UTILITIES
[docs]
def updateCurve(self) -> Any:
''' Update curve arrays and interpolation functions. '''
self._dataQ = self._dataQ0
self._dataH = self._dataH0
# probably in future: interp1d obsolete
self._funQtoH = interp1d(self._dataQ, self._dataH, fill_value='extrapolate')
self._funHtoQ = interp1d(self._dataH, self._dataQ, fill_value='extrapolate')
#self._coeffQtoH = np.polyfit(self._dataQ, self._dataH, 2)
#self._coeffHtoQ = np.polyfit(self._dataH, self._dataQ, 2)
# curve min and max points
self._Qb = min(self._dataQ) * u.m**3/u.h
self._Qc = self._Qb
self._Qe = max(self._dataQ) * u.m**3/u.h
self._Hb = min(self._dataH) * u.m
self._He = max(self._dataH) * u.m
# --------------------------------------------------------------------------
# REPRESENTATION
[docs]
def toString(self, detail: int=0) -> str:
''' Return string representation.
Args:
detail (int, optional): Detail level.
Returns:
str: String representation.
'''
txt = super().toString(detail) + '\n' \
+ f'Pump: {self._vendor}: {self._spec}\n' \
+ f' Din:{self._din}, Dout:{self._dout}, speed0:{self._speed0}, speed:{self._speed}\n'
return txt
# =============================================================================
# CENTRIFUGAL PUMP
# =============================================================================
[docs]
class Comp_PumpCentrifugal(Comp_Pump): # pylint: disable=invalid-name
''' Centrifugal pump component.
Args:
impeller0 (int | float | Quantity): Pump rated impeller (in mm).
impeller (int | float | Quantity, optional): Pump operating impeller diameter (mm).
Returns:
None
'''
# --------------------------------------------------------------------------
# FIXED PROPERTIES
_part : str = 'Centrifugal'
# --------------------------------------------------------------------------
# INITIALIZE
[docs]
def __init__(self, **kwargs: Any) -> Any:
# arguments
args_in = flsa.GetArgs(kwargs)
self._impeller0 = args_in.getArg(
'impeller0',
[flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.mm)]
)
self._impeller = args_in.getArg(
'impeller',
[flsa.vFun.default(self._impeller0),
flsa.vFun.istype(int, float, Quantity),
flsa.vFun.tounits(u.mm)]
)
# base class init
super().__init__(**args_in.restArgs())
# --------------------------------------------------------------------------
# PROPERTIES
@property
def impeller0(self) -> Quantity:
''' Pump rated impeller size (mm).
Returns:
Quantity: Pump rated impeller size.
'''
return self._impeller0
@property
def impeller(self) -> Quantity:
''' Pump operating impeller size (mm).
Returns:
Quantity: Pump operating impeller size.
'''
return self._impeller
# --------------------------------------------------------------------------
# PHYSICS
# --------------------------------------------------------------------------
# UTILITIES
[docs]
def updateCurve(self) -> None:
''' Update curve arrays and interpolation functions. '''
# impact of speed - impeller size
if self._speed == self._speed0:
self._dataQ = self._dataQ0
self._dataH = self._dataH0
else:
factor = (self._speed / self._speed0 * self._impeller / self._impeller0).magnitude
self._dataQ = self._dataQ0 * factor
self._dataH = self._dataH0 * factor ** 2
# cut negative H
ptrim = np.argmax(self._dataH<=0)
if ptrim>0:
self._dataQ = self._dataQ[:ptrim]
self._dataH = self._dataH[:ptrim]
# probably in future: interp1d obsolete
self._funQtoH = interp1d(self._dataQ, self._dataH, fill_value='extrapolate')
self._funHtoQ = interp1d(self._dataH, self._dataQ, fill_value='extrapolate')
# curve begin, end, critical point
self._Qb = self._dataQ[0] * u.m**3/u.h
self._Qe = self._dataQ[-1] * u.m**3/u.h
Qpts = np.linspace(start=self._Qb.magnitude, stop=self._Qe.magnitude, num=N_CURVE_POINTS, endpoint=True)
Hpts = self._funQtoH(Qpts)
self._Qc = Qpts[np.argmax(Hpts)] * u.m**3/u.h
# --------------------------------------------------------------------------
# REPRESENTATION
[docs]
def toString(self, detail: int=0) -> str:
''' Return string representation.
Args:
detail (int, optional): Detail level.
Returns:
str: String representation.
'''
txt = f'Pump: {self._part} : {self._vendor}: {self._spec}\n' + \
f' Din:{self._din}, Dout:{self._dout}, Impeller0:{self._impeller0}, Impeller:{self._impeller}, Speed0:{self._speed0}, Speed:{self._speed}\n'
return txt
# =============================================================================
# SERIAL PUMPS
# =============================================================================
[docs]
class Comp_PumpSerial(Comp_Pump): # pylint: disable=invalid-name
''' Pumps connected in series.
Heads add, flow is identical.
'''
# --------------------------------------------------------------------------
# FIXED PROPERTIES
_part : str = 'PumpSerial'
# --------------------------------------------------------------------------
# INITIALIZE
[docs]
def __init__(self, **kwargs: Any) -> Any:
# arguments
args_in = flsa.GetArgs(kwargs)
args_in.addArgs({
'hasdata': False,
'speed0': 1*u.rpm,
})
self._pumps: List[Comp_Pump] = args_in.getArg(
'pumps',
[flsa.vFun.istype(list),
flsa.vFun.lenmin(1)]
)
# base class init
super().__init__(**args_in.restArgs())
# --------------------------------------------------------------------------
# PHYSICS
[docs]
def calcH(self, Q: int | float | Quantity, sense: int=1, pin: int=1, pout:int=2) -> Quantity:
''' Calculate head for the serial pump assembly.
Args:
Q (int | float | Quantity): Flow rate (default unit: m3/h).
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Head (m).
'''
if (sense < 0 and pin < pout) or (sense > 0 and pin > pout):
return 0.0 * u.m
lQ = flsa.toUnits(Q, u.m**3/u.h)
return self._funQtoH(lQ.magnitude) * u.m
[docs]
def calcQ(self, H: int | float | Quantity, guess: Any=200, sense: int=1, pin: int=1, pout:int=2) -> Quantity:
''' Calculate flow rate for the serial pump assembly.
Args:
H (int | float | Quantity): Head (default unit: m).
guess (Any): Unused; kept for API compatibility.
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Flow rate (in m3/h).
'''
lH = flsa.toUnits(H, u.m)
return self._funHtoQ(lH.magnitude) * u.m**3/u.h
# --------------------------------------------------------------------------
# UTILITIES
[docs]
def updateCurve(self) -> Any:
''' Update curve arrays and interpolation functions. '''
for pump in self._pumps:
if self._Qb is None or pump.Qb < self._Qb:
self._Qb = pump.Qb
if self._Qe is None or pump.Qe > self._Qe:
self._Qe = pump.Qe
if self._Qc is None or pump.Qc > self._Qc:
self._Qc = pump.Qc
self._dataQ = np.linspace(start=self._Qb.magnitude, stop=self._Qe.magnitude, num=N_CURVE_POINTS, endpoint=True)
self._dataH = sum(pump.calcH(Q=self._dataQ).magnitude for pump in self._pumps)
self._funQtoH = interp1d(self._dataQ, self._dataH, fill_value='extrapolate')
self._funHtoQ = interp1d(self._dataH, self._dataQ, fill_value='extrapolate')
# curve critical point
self._Qc = self._dataQ[np.argmax(self._dataH)] * u.m**3/u.h
# --------------------------------------------------------------------------
# REPRESENTATION
[docs]
def toString(self, detail: int=0) -> str:
''' Return string representation.
Args:
detail (int, optional): Detail level.
Returns:
str: String representation.
'''
sdetail = detail // 10
txt = 'Serial pumps:'
for pump in self._pumps:
txt += f' {pump.toString(sdetail)}'
return txt
# =============================================================================
# PARALLEL PUMPS
# =============================================================================
[docs]
class Comp_PumpParallel(Comp_Pump): # pylint: disable=invalid-name
''' Pumps connected in parallel.
Flows add, head is identical.
'''
# --------------------------------------------------------------------------
# FIXED PROPERTIES
_part : str = 'PumpParallel'
# --------------------------------------------------------------------------
# INITIALIZE
[docs]
def __init__(self, **kwargs: Any) -> Any:
# arguments
args_in = flsa.GetArgs(kwargs)
args_in.addArgs({
'hasdata': False,
'speed0': 1*u.rpm,
})
self._pumps: List[Comp_Pump] = args_in.getArg(
'pumps',
[flsa.vFun.istype(list),
flsa.vFun.lenmin(1)]
)
# base class init
super().__init__(**args_in.restArgs())
# --------------------------------------------------------------------------
# PHYSICS
[docs]
def calcH(self, Q: int | float | Quantity, sense: int=1, pin: int=1, pout:int=2) -> Quantity:
''' Calculate head for the parallel pump assembly.
Args:
Q (int | float | Quantity): Flow rate (default unit: m3/h).
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Head (m).
'''
lQ = flsa.toUnits(Q, u.m**3/u.h)
return self._funQtoH(lQ.magnitude) * u.m
[docs]
def calcQ(self, H: int | float | Quantity, guess: Any=200, sense: int=1, pin: int=1, pout:int=2) -> Quantity: # pylint: disable=unused-argument
''' Calculate flow rate for the parallel pump assembly.
Args:
H (int | float | Quantity): Head (default unit: m).
guess (Any): Unused; kept for API compatibility.
sense (int): Flow direction indicator (+1 or -1).
pin (int): Inlet port number.
pout (int): Outlet port number.
Returns:
Quantity: Flow rate (in m3/h).
'''
lH = flsa.toUnits(H, u.m)
return self._funHtoQ(lH.magnitude) * u.m**3/u.h
# --------------------------------------------------------------------------
# UTILITIES
[docs]
def updateCurve(self) -> None:
''' Update curve arrays and interpolation functions. '''
Hmin = None
Hmax = None
for pump in self._pumps:
if self._Qb is None or pump.Qb < self._Qb:
self._Qb = pump.Qb
if self._Qe is None or pump.Qe > self._Qe:
self._Qe = pump.Qe
if self._Qc is None or pump.Qc > self._Qc:
self._Qc = pump.Qc
Hma = max(pump.calcH(Q=pump.Qb), pump.calcH(Q=pump.Qc), pump.calcH(Q=pump.Qe))
Hmi = min(pump.calcH(Q=pump.Qb), pump.calcH(Q=pump.Qc), pump.calcH(Q=pump.Qe))
if Hmin is None or Hmi < Hmin:
Hmin = Hmi
if Hmax is None or Hma > Hmax:
Hmax = Hma
Hmin = max(Hmin, 0 * u.m)
#self._dataH0 = np.linspace(start=0, stop=Hm.magnitude, num=100, endpoint=True)
self._dataH0 = np.linspace(start=Hmin.magnitude, stop=Hmax.magnitude, num=N_CURVE_POINTS, endpoint=True)
self._dataQ0 = np.zeros(len(self._dataH0))
for pump in self._pumps:
hh = pump.calcQ(H=self._dataH0).magnitude
self._dataQ0 += hh
tr = np.argmax(self._dataQ0<=0)
if tr>0:
self._dataQ0 = self._dataQ0[:tr]
self._dataH0 = self._dataH0[:tr]
#print('tr',Hmin, Hmax, tr, self._dataQ0, self._dataH0)
#self._dataQ0 = sum([pump.calcQ(H=self._dataH0).magnitude for pump in self._pumps])
#i = np.where(self._dataQ0>0.01)
#self._dataH0 = self._dataH0[1:j]
#self._dataQ0 = self._dataQ0[1:j]
self._dataQ = self._dataQ0
self._dataH = self._dataH0
self._Qb = self._dataQ0[-1] * u.m**3/u.h
self._Qe = self._dataQ0[0] * u.m**3/u.h
self._funQtoH = interp1d(self._dataQ0, self._dataH0, fill_value='extrapolate')
self._funHtoQ = interp1d(self._dataH0, self._dataQ0, fill_value='extrapolate')
# --------------------------------------------------------------------------
# REPRESENTATION
[docs]
def toString(self, detail: int=0) -> str:
''' Return string representation.
Args:
detail (int, optional): Detail level.
Returns:
str: String representation.
'''
sdetail = detail // 10
txt = 'Parallel pumps:'
for pump in self._pumps:
txt += f' {pump.toString(sdetail)}'
return txt