-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtask_flow.py
More file actions
132 lines (118 loc) · 4.35 KB
/
task_flow.py
File metadata and controls
132 lines (118 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from prefect import task, Task, Flow
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import logging
import prefect
from timelogs.source import TimeStamps, Source, MasterDb
from timelogs.message import Message
from timelogs.slack import Slack
from typing import Any, Dict, List
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import json
@task
def get_latest_timestamps():
'''
Output: float
'''
db = Slack()
every_ts_db = db.select_timestamps()
last_ts_db = db.select_last_timestamp_by_max()
if last_ts_db == None:
last_ts_db = '1622909735.001234'
return float(last_ts_db)
@task
def retrive_messages(ts_from_db: float=1522909733.001234):
'''
Input: float
Output: List[Message]
'''
with open('./token.json') as json_file:
token_dict = json.load(json_file)
token = token_dict['SLACK_TOKEN']
client = WebClient(token=token)
logger = prefect.context.get("logger")
channel_name = "learning"
conversation_id = None
result = client.conversations_list()
try:
for response in result:
for channel in result["channels"]:
if channel["name"] == channel_name:
conversation_id = channel["id"]
print(f"For channel #{channel_name} found conversation ID: {conversation_id}")
break
except SlackApiError as e:
print(f"Error: {e}")
channel_id = "C02327JDKAS"
print("retrive_messages - last time stamp from db: {}".format(ts_from_db))
try:
result = client.conversations_history(channel=channel_id)
conversation_history = result["messages"]
logger.info("{} messages found in {}".format(len(conversation_history), id))
new_messages = []
for message in conversation_history:
ts_from_message = message['ts']
ts = float(ts_from_message)
if ts >= ts_from_db:
logger.info("newer message - {}\n content: {}".format(ts, message['text']))
new_messages.append(message)
# add newer timestamp even if not a message record
db = Slack(timestamp=ts)
db.add_timestamp_to_db()
print("new_messages: {}".format(new_messages))
return new_messages
except SlackApiError as e:
logger.error("Error creating conversation: {}".format(e))
return None
@task
def parse_messages(messages: List[str] = None):
'''
Input: List[Message]
Output: List[Message]
'''
id = None
records = []
for message in messages:
text = message['text']
user = message['user']
timestamp = message['ts']
for single_line in text.split('\n'):
msg = Message(text=single_line, user=user, ts=timestamp)
record = msg.to_records()
if record:
logger = prefect.context.get("logger")
logger.info(f"New record: {record}")
records.append(record)
first_name, last_name, email = msg.check_add_me()
if first_name and last_name and email:
#first_name, last_name, email = msg.check_add_me()
logger = prefect.context.get("logger")
logger.info(f"New User: {first_name} {last_name}")
db = Slack()
id = message['user']
insert_new_user = db.insert_into_master_db(id=id, first_name=first_name,
last_name=last_name, email=email)
#check if delete message
delete_date = msg.check_delete()
if delete_date:
logger = prefect.context.get("logger")
logger.info(f"Delete timelogs from: {delete_date}")
db = Slack()
db.delete_timelog(user=message['user'], delete_date=delete_date)
print("records: {}".format(records))
return records
@task
def insert_into_db(records: List[str] = None):
'''
Input: List[Message]
'''
for record in records:
db = Slack(messages=record)
db.insert_into()
with Flow('Slack messages') as flow:
ts_latest = get_latest_timestamps()
messages = retrive_messages(ts_latest)
timelogs = parse_messages(messages)
add_to_database = insert_into_db(timelogs)
flow.run()