157 lines
4.9 KiB
Python
157 lines
4.9 KiB
Python
|
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
||
|
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
|
||
|
|
||
|
"""
|
||
|
Functions to manipulate packed binary representations of number sets.
|
||
|
|
||
|
To save space, coverage stores sets of line numbers in SQLite using a packed
|
||
|
binary representation called a numbits. A numbits is a set of positive
|
||
|
integers.
|
||
|
|
||
|
A numbits is stored as a blob in the database. The exact meaning of the bytes
|
||
|
in the blobs should be considered an implementation detail that might change in
|
||
|
the future. Use these functions to work with those binary blobs of data.
|
||
|
|
||
|
"""
|
||
|
import json
|
||
|
|
||
|
from itertools import zip_longest
|
||
|
|
||
|
from coverage.misc import contract, new_contract
|
||
|
|
||
|
def _to_blob(b):
|
||
|
"""Convert a bytestring into a type SQLite will accept for a blob."""
|
||
|
return b
|
||
|
|
||
|
new_contract('blob', lambda v: isinstance(v, bytes))
|
||
|
|
||
|
|
||
|
@contract(nums='Iterable', returns='blob')
|
||
|
def nums_to_numbits(nums):
|
||
|
"""Convert `nums` into a numbits.
|
||
|
|
||
|
Arguments:
|
||
|
nums: a reusable iterable of integers, the line numbers to store.
|
||
|
|
||
|
Returns:
|
||
|
A binary blob.
|
||
|
"""
|
||
|
try:
|
||
|
nbytes = max(nums) // 8 + 1
|
||
|
except ValueError:
|
||
|
# nums was empty.
|
||
|
return _to_blob(b'')
|
||
|
b = bytearray(nbytes)
|
||
|
for num in nums:
|
||
|
b[num//8] |= 1 << num % 8
|
||
|
return _to_blob(bytes(b))
|
||
|
|
||
|
|
||
|
@contract(numbits='blob', returns='list[int]')
|
||
|
def numbits_to_nums(numbits):
|
||
|
"""Convert a numbits into a list of numbers.
|
||
|
|
||
|
Arguments:
|
||
|
numbits: a binary blob, the packed number set.
|
||
|
|
||
|
Returns:
|
||
|
A list of ints.
|
||
|
|
||
|
When registered as a SQLite function by :func:`register_sqlite_functions`,
|
||
|
this returns a string, a JSON-encoded list of ints.
|
||
|
|
||
|
"""
|
||
|
nums = []
|
||
|
for byte_i, byte in enumerate(numbits):
|
||
|
for bit_i in range(8):
|
||
|
if (byte & (1 << bit_i)):
|
||
|
nums.append(byte_i * 8 + bit_i)
|
||
|
return nums
|
||
|
|
||
|
|
||
|
@contract(numbits1='blob', numbits2='blob', returns='blob')
|
||
|
def numbits_union(numbits1, numbits2):
|
||
|
"""Compute the union of two numbits.
|
||
|
|
||
|
Returns:
|
||
|
A new numbits, the union of `numbits1` and `numbits2`.
|
||
|
"""
|
||
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
||
|
return _to_blob(bytes(b1 | b2 for b1, b2 in byte_pairs))
|
||
|
|
||
|
|
||
|
@contract(numbits1='blob', numbits2='blob', returns='blob')
|
||
|
def numbits_intersection(numbits1, numbits2):
|
||
|
"""Compute the intersection of two numbits.
|
||
|
|
||
|
Returns:
|
||
|
A new numbits, the intersection `numbits1` and `numbits2`.
|
||
|
"""
|
||
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
||
|
intersection_bytes = bytes(b1 & b2 for b1, b2 in byte_pairs)
|
||
|
return _to_blob(intersection_bytes.rstrip(b'\0'))
|
||
|
|
||
|
|
||
|
@contract(numbits1='blob', numbits2='blob', returns='bool')
|
||
|
def numbits_any_intersection(numbits1, numbits2):
|
||
|
"""Is there any number that appears in both numbits?
|
||
|
|
||
|
Determine whether two number sets have a non-empty intersection. This is
|
||
|
faster than computing the intersection.
|
||
|
|
||
|
Returns:
|
||
|
A bool, True if there is any number in both `numbits1` and `numbits2`.
|
||
|
"""
|
||
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
||
|
return any(b1 & b2 for b1, b2 in byte_pairs)
|
||
|
|
||
|
|
||
|
@contract(num='int', numbits='blob', returns='bool')
|
||
|
def num_in_numbits(num, numbits):
|
||
|
"""Does the integer `num` appear in `numbits`?
|
||
|
|
||
|
Returns:
|
||
|
A bool, True if `num` is a member of `numbits`.
|
||
|
"""
|
||
|
nbyte, nbit = divmod(num, 8)
|
||
|
if nbyte >= len(numbits):
|
||
|
return False
|
||
|
return bool(numbits[nbyte] & (1 << nbit))
|
||
|
|
||
|
|
||
|
def register_sqlite_functions(connection):
|
||
|
"""
|
||
|
Define numbits functions in a SQLite connection.
|
||
|
|
||
|
This defines these functions for use in SQLite statements:
|
||
|
|
||
|
* :func:`numbits_union`
|
||
|
* :func:`numbits_intersection`
|
||
|
* :func:`numbits_any_intersection`
|
||
|
* :func:`num_in_numbits`
|
||
|
* :func:`numbits_to_nums`
|
||
|
|
||
|
`connection` is a :class:`sqlite3.Connection <python:sqlite3.Connection>`
|
||
|
object. After creating the connection, pass it to this function to
|
||
|
register the numbits functions. Then you can use numbits functions in your
|
||
|
queries::
|
||
|
|
||
|
import sqlite3
|
||
|
from coverage.numbits import register_sqlite_functions
|
||
|
|
||
|
conn = sqlite3.connect('example.db')
|
||
|
register_sqlite_functions(conn)
|
||
|
c = conn.cursor()
|
||
|
# Kind of a nonsense query:
|
||
|
# Find all the files and contexts that executed line 47 in any file:
|
||
|
c.execute(
|
||
|
"select file_id, context_id from line_bits where num_in_numbits(?, numbits)",
|
||
|
(47,)
|
||
|
)
|
||
|
"""
|
||
|
connection.create_function("numbits_union", 2, numbits_union)
|
||
|
connection.create_function("numbits_intersection", 2, numbits_intersection)
|
||
|
connection.create_function("numbits_any_intersection", 2, numbits_any_intersection)
|
||
|
connection.create_function("num_in_numbits", 2, num_in_numbits)
|
||
|
connection.create_function("numbits_to_nums", 1, lambda b: json.dumps(numbits_to_nums(b)))
|