-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
196 lines (173 loc) · 7.13 KB
/
main.py
File metadata and controls
196 lines (173 loc) · 7.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
import json
import sqlite3
from secrets import token_bytes
from cloudproof_py import cover_crypt
from cloudproof_py import findex
from findex_db import FindexSQLite
from termcolor import colored
SQL_CREATE_USERS_TABLE = """CREATE TABLE IF NOT EXISTS users (
id BLOB PRIMARY KEY,
firstName BLOB NOT NULL,
lastName BLOB NOT NULL,
email BLOB NOT NULL,
phone BLOB NOT NULL,
country BLOB NOT NULL,
region BLOB NOT NULL,
employeeNumber BLOB NOT NULL,
security BLOB NOT NULL
);"""
SQL_CREATE_ENTRY_TABLE = """CREATE TABLE IF NOT EXISTS entry_table (
uid BLOB PRIMARY KEY,
value BLOB NOT NULL
);"""
SQL_CREATE_CHAIN_TABLE = """CREATE TABLE IF NOT EXISTS chain_table (
uid BLOB PRIMARY KEY,
value BLOB NOT NULL
);"""
if __name__ == "__main__":
# Creating DB tables
conn = sqlite3.connect(":memory:")
# Table to store encrypted data
conn.execute(SQL_CREATE_USERS_TABLE)
# Indexing tables required by Findex
conn.execute(SQL_CREATE_ENTRY_TABLE)
conn.execute(SQL_CREATE_CHAIN_TABLE)
# Initialize CoverCrypt
policy = cover_crypt.Policy()
policy.add_axis(
cover_crypt.PolicyAxis(
"Country",
[("France", False), ("Spain", False), ("Germany", False)],
hierarchical=False,
)
)
policy.add_axis(
cover_crypt.PolicyAxis(
"Department",
[("MKG", False), ("HR", False), ("SEC", False)],
hierarchical=True,
)
)
cc_interface = cover_crypt.CoverCrypt()
cc_master_key, cc_public_key = cc_interface.generate_master_keys(policy)
# Creating user key with different policy access
key_Alice = cc_interface.generate_user_secret_key(
cc_master_key,
"Country::France && Department::MKG",
policy,
)
key_Bob = cc_interface.generate_user_secret_key(
cc_master_key,
"(Country::Spain || Country::Germany) && Department::HR",
policy,
)
key_Charlie = cc_interface.generate_user_secret_key(
cc_master_key,
"(Country::France || Country::Spain) && Department::SEC",
policy,
)
# Declare encryption scheme
mapping_field_department = {
"firstName": "MKG",
"lastName": "MKG",
"phone": "HR",
"email": "HR",
"country": "HR",
"region": "HR",
"employeeNumber": "SEC",
"security": "SEC",
}
# Insert user data to DB + Indexing
with open("./data.json", "r", encoding="utf-8") as f:
users = json.load(f)
# Encryption + Insertion in DB
user_db_uids = []
for user in users:
# Encrypt each column of the user individually
encrypted_user = [
cc_interface.encrypt(
policy,
f"Country::{user['country']} && Department::{mapping_field_department[col_name]}",
cc_public_key,
col_value.encode("utf-8"),
)
for col_name, col_value in user.items()
]
db_uid = token_bytes(32)
user_db_uids.append(db_uid)
conn.execute(
"""INSERT INTO
users(id,firstName,lastName, email, phone, country, region, employeeNumber, security)
VALUES(?,?,?,?,?,?,?,?,?)""",
(db_uid, *encrypted_user),
)
print("CoverCrypt: encryption and db insertion done!")
# Initialize Findex
findex_interface = FindexSQLite(findex.Key.random(), "My label", conn)
# Mapping of the users database UID to the corresponding keywords (firstname, lastname, etc)
mapping_indexed_values_to_keywords: findex.typing.IndexedValuesAndKeywords = {
findex.Location.from_bytes(user_id): [
keyword.lower() for keyword in user.values()
]
for user_id, user in zip(user_db_uids, users)
}
# Upsert keywords
findex_interface.findex.add(mapping_indexed_values_to_keywords)
print("Findex: Done indexing", len(users), "users")
print(
"Auto completion available: only type the 3 first letters of a word to get results"
)
activate_auto_completion = input("Activate search auto completion? [y/n] ") == "y"
if activate_auto_completion:
keywords = [keyword.lower() for user in users for keyword in user.values()]
findex_interface.findex.add(findex.utils.generate_auto_completion(keywords))
cc_user_keys = {"Alice": key_Alice, "Bob": key_Bob, "Charlie": key_Charlie}
while True:
print("\n Available user keys:")
print("\t Alice: Country::France && Department::MKG")
print("\t Bob: (Country::Spain || Country::Germany) && Department::HR")
print("\t Charlie: (Country::France || Country::Spain) && Department::SEC")
input_user_key = ""
while input_user_key not in cc_user_keys:
input_user_key = input(
"Choose a user key from 'Alice', 'Bob' and 'Charlie': "
)
user_key = cc_user_keys[input_user_key]
print("\n You can now search the database for users by providing keywords")
print("Examples of words to try: 'Martin', 'France', 'Kalia'")
keyword = input("Enter a keyword: ").lower()
# 1. Findex search
found_users = findex_interface.findex.search([keyword])
if len(found_users) == 0:
print(colored("No user found!", "red", attrs=["bold"]))
continue
found_users_uids = [bytes(location) for location in found_users[keyword]]
# 2. Query user database
str_uids = ",".join("?" * len(found_users_uids))
cur = conn.execute(
f"SELECT * FROM users WHERE id IN ({str_uids})",
found_users_uids,
)
encrypted_data = cur.fetchall()
# 3. Decryption
print("Query results:")
for db_user in encrypted_data:
encrypted_user = db_user[1:] # skip the uid
for i, col_name in enumerate(mapping_field_department):
try:
decrypted_value, _ = cc_interface.decrypt(
user_key, encrypted_user[i]
)
print(
f"{col_name}: {decrypted_value.decode('utf-8'):12.12}",
end=" | ",
)
except Exception:
# Our user doesn't have access to this data
print(
f"{col_name}:",
colored("Unauthorized", "red", attrs=["bold"]),
end=" | ",
)
print("")