forked from clicksign/python-dev-test
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpopula.py
More file actions
74 lines (56 loc) · 1.81 KB
/
popula.py
File metadata and controls
74 lines (56 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import random, time, sqlite3, os
from sqlite3 import Error
from airflow import DAG
import pandas as pd
import os
PATH = "/home/churebas/projetos/python-dev-test/data/"
def verifica_arquivo():
"""verifica se o DataFrame adult esta vazio, se sim, exclui"""
arquivo = PATH + "adult.csv"
if os.path.isfile(arquivo):
df = pd.read_csv(arquivo)
if df.shape[0] == 0:
os.remove(arquivo)
parar = True
else:
parar = False
else:
parar = True
if parar:
os.system("export AIRFLOW_HOME=$(pwd)/airflow")
os.system("airflow dags pause popula_bd")
print("parou")
def percorre_df(quantidade):
"""puxa os dados do dataframe adult e popula banco de dados """
arquivo = PATH + "adult.csv"
df = pd.read_csv(arquivo)
if quantidade > df.shape[0]:
quantidade = df.shape[0]
usar_df = df.iloc[:quantidade]
coon = sqlite3.connect(PATH + "adult.db")
usar_df.to_sql("adult", coon, index=False, if_exists="append")
df.drop(index=[n for n in range(quantidade)], inplace=True)
df.to_csv(arquivo, index=False)
default_args = {
'owner': 'victor hugo',
"email": ['vh15fleury@hotmail.com'],
'retries': 1,
}
with DAG(
'popula_bd',
default_args=default_args,
start_date = datetime.now(),
schedule_interval = timedelta(seconds=10),
catchup=False,
tags=['ETL']
) as dag:
t1 = PythonOperator(
task_id='verifica_arquivo',
python_callable=verifica_arquivo)
t2 = PythonOperator(
task_id='popula_adult',
python_callable=percorre_df,
op_kwargs={'quantidade': 1630}),
t1 >> t2