123 lines
3.7 KiB
Python
123 lines
3.7 KiB
Python
def read_csv(filename):
|
|
"""Read a CSV file, return list of rows"""
|
|
|
|
import csv
|
|
with open(filename, 'rt', newline='') as f:
|
|
reader = csv.reader(f, skipinitialspace=True)
|
|
return [row for row in reader]
|
|
|
|
|
|
def header_map(headers):
|
|
"""
|
|
Read a list, and convert it to a dictionary where the key is each element of the given list and the value is
|
|
its position in the list
|
|
:param headers: List
|
|
:return: Dict
|
|
"""
|
|
|
|
header_dict = dict()
|
|
i = 0
|
|
for header in headers:
|
|
header_dict[header] = i
|
|
i = i + 1
|
|
return header_dict
|
|
|
|
|
|
def select(table, search_items):
|
|
"""
|
|
Read the set in the second argument and search through the table in the first argument and return the
|
|
columns that match the query in the search items
|
|
:param table: List
|
|
:param search_items: List
|
|
:return: List
|
|
"""
|
|
|
|
header_numbers = header_map(table[0])
|
|
ret_list = list()
|
|
columns = list()
|
|
|
|
for item in search_items:
|
|
if type(item) is int: # Convert searched elements into strings if it is a number
|
|
columns.append(header_numbers[str(item)])
|
|
else:
|
|
columns.append(header_numbers[item])
|
|
columns.sort()
|
|
for item in table:
|
|
lst = list()
|
|
for number in columns:
|
|
lst.append(item[number])
|
|
ret_list.append(lst)
|
|
return ret_list
|
|
|
|
|
|
def row2dict(hmap, row):
|
|
"""
|
|
Convert a row in the second argument, given the headers in the first argument to a dictionary which uses the
|
|
headers as keys and the row data as values
|
|
:param hmap: Dictionary
|
|
:param row: List
|
|
:return: Dictionary
|
|
"""
|
|
ret_dict = dict()
|
|
for key in hmap:
|
|
ret_dict[key] = row[hmap[key]]
|
|
return ret_dict
|
|
|
|
|
|
def check_row(row, query):
|
|
"""
|
|
Check the row in the first argument passes a query in the second argument. The second argument is a formatted
|
|
tuple where the first element in the tuple is a column name, the second is an operation (==, <=, >=, AND,
|
|
OR) and the third element is a condition, numeric or string matching. (ex: is age == 34, is color == blue). AND
|
|
and OR are special in that you can pass in recursive tuples (left and right argument are tuples) that will also
|
|
be evaluated recursively
|
|
:param row: List
|
|
:param query: Tuple
|
|
:return: Boolean
|
|
"""
|
|
|
|
def perform_operation(op, var, cond):
|
|
if type(var) is str:
|
|
if var.isnumeric():
|
|
var = int(var)
|
|
if type(cond) is str:
|
|
if cond.isnumeric():
|
|
cond = int(cond)
|
|
|
|
if op == '==':
|
|
return var == cond
|
|
elif op == '>=':
|
|
return var >= cond
|
|
elif op == '<=':
|
|
return var <= cond
|
|
elif op == 'OR':
|
|
return perform_operation(var[1], row[var[0]], var[2]) or perform_operation(cond[1], row[cond[0]], cond[2])
|
|
elif op == 'AND':
|
|
return perform_operation(var[1], row[var[0]], var[2]) and perform_operation(cond[1], row[cond[0]], cond[2])
|
|
else:
|
|
return False
|
|
|
|
if type(query[0]) and type(query[2]) is tuple:
|
|
return perform_operation(query[1], query[0], query[2])
|
|
else:
|
|
stringify = str(query[2])
|
|
return perform_operation(query[1], row[str(query[0])], stringify)
|
|
|
|
|
|
def filter_table(table, query):
|
|
"""
|
|
This function takes a table of csv values, and performs the query to filter out the rows that do not match the query
|
|
:param table: List
|
|
:param query: Tuple
|
|
:return: List
|
|
"""
|
|
header_row = header_map(table[0])
|
|
data_rows = table[1:]
|
|
result = list()
|
|
result.append(table[0])
|
|
for row in data_rows:
|
|
data_dict = row2dict(header_row, row)
|
|
if check_row(data_dict, query):
|
|
result.append(row)
|
|
return result
|