-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTraining.py
More file actions
139 lines (117 loc) · 4.7 KB
/
Training.py
File metadata and controls
139 lines (117 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import ccxt
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
import time
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Set up Binance client using ccxt
api_key = 'YOUR_API_KEY'
api_secret = 'YOUR_API_SECRET'
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {'defaultType': 'future'} # for futures trading
})
# Define the trading symbol and fetch historical data
symbol = 'BTC/USDT' # You can change this to other pairs like ETH/USDT, BNB/USDT
timeframe = '1h' # Timeframe for OHLCV data, change as needed
limit = 43800 # Limit for data fetching
# Function to fetch OHLCV data and compute technical indicators
def fetch_data(symbol, timeframe='1h', limit=43800):
try:
# Fetch OHLCV data from Binance
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
# Calculate technical indicators
df['rsi'] = compute_rsi(df['close'])
df['adx'] = compute_adx(df)
df['atr'] = compute_atr(df)
df['vwap'] = compute_vwap(df)
df['taker_ratio'] = compute_taker_ratio(df)
return df
except Exception as e:
print(f"Error fetching data for {symbol}: {str(e)}")
return None
# Compute RSI
def compute_rsi(data, period=14):
delta = data.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=period).mean()
avg_loss = loss.rolling(window=period).mean()
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
# Compute ADX
def compute_adx(df, period=14):
high = df['high']
low = df['low']
close = df['close']
tr = np.maximum(high - low, np.abs(high - close.shift(1)), np.abs(low - close.shift(1)))
atr = tr.rolling(window=period).mean()
return atr
# Compute ATR
def compute_atr(df, period=14):
df['tr'] = np.maximum(df['high'] - df['low'], np.abs(df['high'] - df['close'].shift(1)), np.abs(df['low'] - df['close'].shift(1)))
return df['tr'].rolling(window=period).mean()
# Compute VWAP
def compute_vwap(df):
df['vwap'] = (df['volume'] * df['close']).cumsum() / df['volume'].cumsum()
return df['vwap']
# Compute Taker Ratio (Custom Indicator)
def compute_taker_ratio(df):
return df['volume'] / df['volume'].mean()
# Preprocess the data for CNN
def preprocess_data(df, window_size=20):
features = df[['rsi', 'adx', 'atr', 'vwap', 'taker_ratio']].values
X = []
y = []
# Create sliding windows of size `window_size`
for i in range(window_size, len(features)):
X.append(features[i - window_size:i])
# Predicting 1 for price going up, 0 for price going down
y.append(1 if df['close'].iloc[i] > df['close'].iloc[i - 1] else 0)
return np.array(X), np.array(y)
# Build the CNN model using Keras
def build_cnn_model(input_shape):
model = tf.keras.Sequential([
tf.keras.layers.Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
tf.keras.layers.MaxPooling1D(pool_size=2),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Conv1D(128, kernel_size=3, activation='relu'),
tf.keras.layers.MaxPooling1D(pool_size=2),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid') # Binary classification (up/down movement)
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
return model
# Train the CNN model
def train_model():
# Fetch data from Binance
df = fetch_data(symbol)
if df is None:
return
# Preprocess the data for CNN
X, y = preprocess_data(df)
X = X.reshape(X.shape[0], X.shape[1], X.shape[2]) # Reshaping for CNN input (samples, timesteps, features)
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Build and train the CNN model
model = build_cnn_model(X_train.shape[1:])
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))
# Save the trained model
model.save('trained_cnn_model.h5')
print("Model saved as 'trained_cnn_model.h5'")
# Evaluate the model
y_pred = (model.predict(X_test) > 0.5).astype(int)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model accuracy: {accuracy * 100:.2f}%")
# Main execution
if __name__ == '__main__':
train_model()