Code
import pandas as pd
import csv
# Load rules table
rules_table = []
with open('rules.csv') as csvfile:
reader = csv.DictReader(csvfile, delimiter='|')
for row in reader:
rules_table.append([x.strip() for x in row.values()])
# Load CSV file into DataFrame
df = pd.read_csv('data.csv', sep=",")
def rules_eval(row, rules):
" Steps through rules table for appropriate value "
def operator_eval(op, col, value):
if op == 'Equal':
return str(row[col]) == str(value)
else:
# Curently only Equal supported
raise ValueError(f"Unsupported Operator Value {op}, only Equal allowed")
prev_rule = '~'
for col, op, val, operand, rule, res in rules:
# loop through rows of rule table
if prev_rule != rule:
# rule ID changed so we can follow rule chains again
ignore_rule = False
if not ignore_rule:
if operator_eval(op, col, val):
if operand != 'and':
return res
else:
# Rule didn't work for an item in group
# ignore subsequent rules with this id
ignore_rule = True
prev_rule = rule
return None
df['results'] = df.apply(lambda row: rules_eval(row, rules_table), axis=1)
print(df)
Sortie
ABC CDE XYZ results
0 12 10 AD 1.5
1 11 10 AD 1.2
Explication
df.apply - applique le rules_eval
à chaque ligne du DataFrame.
Le résultat est placé dans la colonne "résultat" via
df['result'] = ...
Priorité de la règle de traitement
Changer
Ajout d'une colonne Priorité dans la table rules_table afin que les règles ayant le même RuleID soient traitées par ordre de priorité.
Ordre de priorité décidé par l'ordre des tuple ajouté au tas, actuellement
Priority, Column_Name, Operator, Column_Value, Operand, RuleID, Result
Code
import pandas as pd
import csv
from collections import namedtuple
from heapq import (heappush, heappop)
# Load CSV file into DataFrame
df = pd.read_csv('data.csv', sep=",")
class RulesEngine():
###########################################
# Static members
###########################################
# Named tuple for rules
fieldnames = 'Column_Name|Operator|Column_Value1|Operand|RuleID|Priority|Result'
Rule = namedtuple('Rule', fieldnames.replace('|', ' '))
number_fields = fieldnames.count('|') + 1
###########################################
# members
###########################################
def __init__(self, table_file):
# Load rules table
rules_table = []
with open(table_file) as csvfile:
reader = csv.DictReader(csvfile, delimiter='|')
for row in reader:
fields = [self.convert(x.strip()) for x in row.values() if x is not None]
if len(fields) != self.number_fields:
# Incorrect number of values
error = f"Rules require {self.number_fields} fields per row, was given {len(fields)}"
raise ValueError(error)
rules_table.append([self.convert(x.strip()) for x in row.values()])
#rules_table.append([x.strip() for x in row.values()])
self.rules_table = rules_table
def convert(self, s):
" Convert string to (int, float, or leave current value) "
try:
return int(s)
except ValueError:
try:
return float(s)
except ValueError:
return s
def operator_eval(self, row, rule):
" Determines value for a rule "
if rule.Operator == 'Equal':
return str(row[rule.Column_Name]) == str(rule.Column_Value1)
else:
# Curently only Equal supported
error = f"Unsupported Operator {rule.Operator}, only Equal allowed"
raise ValueError(error)
def get_rule_value(self, row, rule_queue):
" Value of a rule or None if no matching rule "
found_match = True
while rule_queue:
priority, rule_to_process = heappop(rule_queue)
if not self.operator_eval(row, rule_to_process):
found_match = False
break
return rule_to_process.Result if found_match else None
def rules_eval(self, row):
" Steps through rules table for appropriate value "
rule_queue = []
for index, r in enumerate(self.rules_table):
# Create named tuple with current rule values
current_rule = self.Rule(*r)
if not rule_queue or \
rule_queue[-1][1].RuleID == current_rule.RuleID:
# note: rule_queue[-1][1].RuleID is previous rule
# Within same rule group or last rule of group
priority = current_rule.Priority
# heap orders rules by pririty
# (lowest numbers are processed first)
heappush(rule_queue, (priority, current_rule))
if index < len(self.rules_table)-1:
continue # not at last rule, so keep accumulating
# Process rules in the rules queue
rule_value = self.get_rule_value(row, rule_queue)
if rule_value:
return rule_value
else:
# Starting over with new rule group
rule_queue = []
priority = current_rule.Priority
heappush(rule_queue, (priority, current_rule))
# Process Final queue if not empty
return self.get_rule_value(row, rule_queue)
# Init rules engine with rules from CSV file
rules_engine = RulesEngine('rules.csv')
df['results'] = df.apply(rules_engine.rules_eval, axis=1)
print(df)
Tableau des données
ABC,CDE,XYZ
12,10,AD
11,10,AD
12,12,AA
Tableau des règles
Column_Name|Operator|Column_Value1|Operand|RuleID|Priority|Result
ABC | Equal| 12| and| 1| 2|1
CDE | Equal| 10| and| 1| 1|1
XYZ | Equal| AD| and| 1| 3|1.5
ABC | Equal| 11| and| 2| 1|1
CDE | Equal| 10| foo| 2| 2|1.2
ABC | Equal| 12| foo| 3| 1|1.8
Sortie
ABC CDE XYZ results
0 12 10 AD 1.5
1 11 10 AD 1.2
2 12 12 AA 1.8