#####################################################################################
"""
1. 공통적으로 사용되는 주가분석 등의 함수들의 모음
[변경이력]
(23.07.29) ATR class 추가_https://technical-analysis-library-in-python.readthedocs.io/en/latest/ta.html#volatility-indicators
(23.10.06) 마켓타이밍 class MT 추가
(23.10.29) stock_data, index_data 새로 만들기
(23.11.05)
한국투자도 같이 사용하기
(23.11.15)
etn 현재가를 못읽어 온다.
"""
######################################################################################
import pandas as pd
import datetime as dt
import subprocess
import Util_Hankook_KR_V3_devel_231110 as hk
import FinanceDataReader as fdr
import pymysql, time
import talib
import pykrx
# pd.set_option('display.max_columns', None) ## 모든 열을 출력한다
# pd.set_option('display.max_rows', None) ## 모든 열을 출력한다
class Stock_data :
def __init__(self):
pass
self.hk_sd=hk.Stock_data()
self.fdr_c = Fdr()
# 개별주식용 #
def get_ohlcv_df(self, code, period): #df 중 close 활용
time.sleep(0.1)
try:
df = self.hk_sd.get_ohlcv_df(code,period)
except :
print(f'FDR로 구합니다')
df = self.fdr_c.get_stock_ohlcv_df(code,period)
return df
def get_current_price(self, code): #df 중 close 활용
time.sleep(0.1)
try:
current_price = self.hk_sd.get_current_price(code)
print(current_price)
if current_price == None or '0':
df = self.get_ohlcv_df(code, period=5)
current_price = df['close'].iloc[-1]
except:
df=self.get_ohlcv_df(code,period=5)
current_price = df['close'].iloc[-1]
current_price = int(current_price)
return current_price
def get_price_before_ndays(self, code, days=-20): # df 중 close 활용
time.sleep(0.1)
period = abs(days)
df=self.get_ohlcv_df(code,period)
price = df['close'].iloc[days]
return price
class Index_data :
def __init__(self):
pass
self.hk_id = hk.Index_data()
self.fdr_c = Fdr()
# index or 미국주식용 : Adj Close 사용해야함 #
def get_ohlcv_df(self, code, period): # df 중 close 활용
time.sleep(0.1)
try :
df = self.hk_id.get_ohlcv_df(code,period)
except:
df = self.fdr_c.get_index_ohlcv_df(code,period)
return df
def get_current_price(self, code): # df 중 close 활용
time.sleep(0.1)
df = self.get_ohlcv_df(code,5)
current_value = df['close'].iloc[-1]
return current_value
def get_price_before_ndays(self, code, days=-20): # df 중 close 활용
time.sleep(0.1)
period = int(abs(days)*1.7)
df = self.get_ohlcv_df(code,period)
current_value = df['close'].iloc[days]
return current_value
class Etf_data :
def __init__(self):
pass
self.hk_id = hk.Index_data()
self.fdr_c = Fdr()
self.naver_c= Naver()
self.pykrx_c = Pykrx()
def get_etf_nav(self,code):
try:
etf_nav = self.naver_c.get_etf_nav(code)
except:
etf_nav = self.pykrx_c.get_etf_nav(code)
return etf_nav
class Pykrx :
def __init__(self):
pass
self.hk_sd=hk.Stock_data()
self.fdr_c = Fdr()
def get_etf_nav(self, code): # df 중 close 활용
time.sleep(0.3)
now = dt.datetime.now()
start_date = now - dt.timedelta(days=5)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = now
end_date_str = now.strftime("%Y-%m-%d")
df = pykrx.stock.get_etf_price_deviation(start_date_str, end_date_str, code)
nav = df['NAV'][-1]
return nav
def get_etf_nav_gap_rate_avg(self, code,period): # 괴리율이라 %값이다
time.sleep(0.3)
now = dt.datetime.now()
start_date = now - dt.timedelta(days=int(period*1.7))
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = now
end_date_str = now.strftime("%Y-%m-%d")
df = pykrx.stock.get_etf_price_deviation(start_date_str, end_date_str, code)
print(df)
TotalGap = 0
for idx, row in df.iterrows():
Gap = abs(float(row['괴리율'])) #이걸절대값으로 하는게 맞나?
TotalGap += Gap
GapAvg = TotalGap / len(df)
print("GapAvg", GapAvg)
return GapAvg
class Fdr :
def __init__(self):
pass
def get_stock_ohlcv_df(self,code,period):
time.sleep(0.3)
period = int((period*1.5)+1)
now = dt.datetime.now() - dt.timedelta(days=period)
start_date = now.strftime('%Y-%m-%d')
df = fdr.DataReader(code, start_date)
df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
df = df.dropna()
df = df.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Volume': 'volume',})
df = df[['open', 'high', 'low', 'close','volume']]
return df
def get_index_ohlcv_df(self,code,period):
time.sleep(0.3)
period = int((period * 1.5) + 1)
now = dt.datetime.now() - dt.timedelta(days=period)
start_date = now.strftime('%Y-%m-%d')
df = fdr.DataReader(code, start_date)
df = df[['Open', 'High', 'Low', 'Adj Close','Volume']]
df = df.dropna()
df = df.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Adj Close': 'close', 'Volume':'volume'})
df = df[['open', 'high', 'low', 'close','volume']]
return df
class Naver :
def __init__(self):
pass
def get_etf_nav(self,code):
time.sleep(0.3)
url = "https://[Log in to view URL]" + code
dfs = pd.read_html(url, encoding='euc-kr')
# pprint.pprint(dfs)
data_dict = dfs[8]
'''
data_keys = list(data_dict.keys())
for key in data_keys:
print("key:",key)
print("data_dict[key]:",data_dict[key])
Second_Key = list(data_dict[key].keys())
for secondkey in Second_Key:
print("secondkey:",secondkey)
print("data_dict[key][secondkey]:", data_dict[key][secondkey])
'''
Nav = int(data_dict[1][0])
return Nav
#네이버는 1페이지라 10일체 데이터밖에 안된다. period가 의미가 없다.
def get_etf_nav_gap_rate_avg(self,code,period):
time.sleep(0.3)
url = "https://[Log in to view URL]" + code
dfs = pd.read_html(url, encoding='euc-kr')
data_dict = dfs[4]
data_list = data_dict["괴리율"].to_list()
print(data_list)
count = 0
TotalGap = 0
for data in data_list:
if "%" in str(data):
Gap = abs(float(data.replace('%', ''))) #이걸 절대값으로 하는게 맞나?
TotalGap += Gap
count += 1
GapAvg = TotalGap / count
return GapAvg
class Stock_data_using_db :
def __init__(self):
# DB 연결
self.db_name = "stock_analysis"
self.engine = pymysql.connect(host='localhost', user='root', password='njmr0623$$', db=f'{self.db_name}',
charset='utf8')
self.table_name = "daily_price_1y" # 1년짜리 사용
#한국투자증권용 사용
self.hk_sd=hk.Stock_data()
def get_stock_ohlc_df(self, code, period=20): #df 중 close 활용
time.sleep(0.1)
now = dt.datetime.now() - dt.timedelta(period * 2)
start_date = now.strftime('%Y-%m-%d')
df = pd.read_sql(f"select * from {self.table_name} where code={code} and date >= {start_date}", con=self.engine)
#디비로 구한게 업다면 아래 Fdr로 수행 수행
if len(df) == 0:
print(f'FDR로 구하기')
df = fdr.DataReader(code,start_date)
df = df[['Open', 'High', 'Low', 'Close','Volume']]
df = df.dropna()
df = df.rename(columns={'Open':'open', 'High':'high', 'Low':'low', 'Close':'close', 'Volume':'volume'})
df = df[['open','high','low','close','volume']]
return df
def get_current_price(self, code): #df 중 close 활용
time.sleep(0.1)
df=self.get_stock_ohlc_df(code)
current_value = df['close'].iloc[-1]
return current_value
def get_price_before_ndays(self, code, days=-20): # df 중 close 활용
time.sleep(0.1)
df=self.get_stock_ohlc_df(code)
price = df['close'].iloc[days-1]
return price
def get_volume_before_ndays(self, code, days): # df 중 close 활용
time.sleep(0.1)
df=self.get_stock_ohlc_df(code)
volume = df['volume'].iloc[days]
return volume
class Ma : #moving Average, 이동평균선
def __init__(self):
self.sd = Stock_data()
self.id = Index_data()
# 단순 이동평균 ######################
def get_sma_df(self, code, period): #df 중 close 활용
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
df[f'{period}_ma'] = df['close'].rolling(window=period, min_periods=1).mean() # min period 안 넣으면 안되네...왜지???
return df
def get_sma_value(self, code, period): #이평 마지막 값 추출
df=self.get_sma_df(code, period)
ma_value = df[f'{period}_ma'].iloc[-1]
return ma_value
# 지수 이동평균 ######################
def get_ema_df(self,code,period): #df 중 close 활용
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
df[f'{period}_ema'] = talib.EMA(df['close'],period)
return df
def get_ema_value(self, code, period): #이평 마지막 값 추출
df=self.get_ema_df(code,period)
ema_value = df[f'{period}_ema'].iloc[-1]
return ema_value
class Bb: #볼린저밴드
def __init__(self):
self.sd = Stock_data()
self.id = Index_data()
def get_bb_df(self, code, period, gap): #gap은 표준편차 예를들어 0.5, 1, 2 표준편차
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
df[f'{period}_ma'] = df['close'].rolling(window=period, min_periods=1).mean() # min period 안 넣으면 안되네...왜지???
df[f'{period}bb_stddev'] = df['close'].rolling(window=period, min_periods=1).std() # 120일 이동표준편차
df[f'{period}bb_upper'] = df[f'{period}_ma'] + gap * df[f'{period}bb_stddev'] # 상단밴드
df[f'{period}bb_lower'] = df[f'{period}_ma'] - gap * df[f'{period}bb_stddev'] # 하단밴드
return df
def get_bb_last_upper_value(self, code, period, gap):
df = self.get_bb_df(code, period, gap)
upper_value = df[f'{period}bb_upper'].iloc[-1]
return upper_value
def get_bb_last_lower_value(self, code, period, gap):
df=self.get_bb_df(code, period, gap)
low_value = df[f'{period}bb_lower'].iloc[-1]
return low_value
class Sto: # 스토캐스틱
def __init__(self):
self.sd = Stock_data()
self.id = Index_data()
def get_fast_k_slow_k_df(self, code, period, fast_k_period, slow_k_period): # 5,3,3 / 10,6,6, / 20,12,12 이런식 사용
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
df[f'{fast_k_period}days_high'] = df['high'].rolling(window=fast_k_period, min_periods=1).max()
df[f'{fast_k_period}days_low'] = df['low'].rolling(window=fast_k_period, min_periods=1).min()
df[f'fask_k_{fast_k_period}'] = ((df['close'] - df[f'{fast_k_period}days_low']) / (df[f'{fast_k_period}days_high'] - df[f'{fast_k_period}days_low'])) * 100
df[f'slow_k_{slow_k_period}'] = df[f'fask_k_{fast_k_period}'].rolling(window=slow_k_period, min_periods=1).mean()
return df
def get_last_fast_k(self, code, period, fast_k_period, slow_k_period):
df = self.get_fast_k_slow_k_df(code, period, fast_k_period, slow_k_period)
fast_k = df[f'fask_k_{fast_k_period}'].iloc[-1]
return fast_k
def get_last_slow_k(self, code, period, fast_k_period, slow_k_period):
df=self.get_fast_k_slow_k_df(code, period, fast_k_period, slow_k_period)
slow_k = df[f'slow_k_{slow_k_period}'].iloc[-1]
return slow_k
class ATR: # using DB
def __init__(self):
# DB 연결
self.db_name = "stock_analysis"
self.engine = pymysql.connect(host='localhost', user='root', password='njmr0623$$', db=f'{self.db_name}', charset='utf8')
self.table_name = "daily_price_1y"
#클래스선언
self.sd = Stock_data()
self.id = Index_data()
def get_atr_df(self, code, period): # df 중 close 활용
try:
now = dt.datetime.now()
start_date = now - dt.timedelta(days=period*1.7)
start_date = start_date.strftime('%Y-%m-%d')
df = pd.read_sql(f"select * from {self.table_name} where code={code} and date >= {start_date}", con=self.engine)
if len(df)==0:
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
except:
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code, period)
df[f'atr_{period}'] = talib.ATR(high=df['high'],low=df['low'],close=df['close'],timeperiod=period)
return df
def get_atr_value_ndays(self, code, period): #이평 마지막 값 추출
df=self.get_atr_df(code,period)
atr = df[f'atr_{period}'].iloc[-1]
return atr
class Market_timing: #마켓타이밍
def __init__(self):
pass
self.sd = Stock_data()
self.id = Index_data()
self.ma = Ma()
self.atr = ATR()
self.hl = High_Low()
def compare_to_ndays_ma(self, code='KQ11', ma_days=5): # 이동평균
try:
current_price = self.id.get_current_price(code)
except:
current_price=self.sd.get_current_price(code)
ma_days_value = self.ma.get_sma_value(code, ma_days)
if (current_price < ma_days_value) :
buy_signal = False
else:
buy_signal = True
return buy_signal
def mt_3_5_10(self, code='KQ11'): # df는 ohlc만
try:
current_price = self.id.get_current_price(code)
if current_price == None:
current_price = self.sd.get_current_price(code)
except:
current_price = self.sd.get_current_price(code)
ma_3_value = self.ma.get_sma_value(code, 3)
ma_5_value = self.ma.get_sma_value(code, 5)
ma_10_value = self.ma.get_sma_value(code, 10)
print(current_price,ma_3_value,ma_5_value,ma_10_value)
if (current_price < ma_3_value) and (current_price < ma_5_value) and (current_price < ma_10_value):
buy_signal = False
else:
buy_signal = True
return buy_signal
def compare_to_ndays_ago(self, code='KQ11', period=20): # df는 ohlc만
try:
current_price = self.id.get_current_price(code)
if current_price == None:
current_price = self.sd.get_current_price(code)
except:
current_price = self.sd.get_current_price(code)
ndays_ago_price = self.sd.get_price_before_ndays(code, period)
if current_price > ndays_ago_price:
buy_signal = True
else:
buy_signal = False
return buy_signal
def break_throgh_high_low_price_ndays_ndays(self, code='KQ11', high_period=20, low_period=10): # df는 ohlc만
try:
current_price = self.id.get_current_price(code)
if current_price == None:
current_price = self.sd.get_current_price(code)
except:
current_price = self.sd.get_current_price(code)
high_price = self.hl.get_last_high_price(code,high_period)
low_price = self.hl.get_last_low_price(code, low_period)
print(current_price,high_price,low_price)
if current_price > high_price:
signal = True
elif current_price < low_price:
signal = False
else :
signal ="nothing"
return signal
class High_Low : #고가, 저가 구하기
def __init__(self):
pass
self.sd= Stock_data()
self.id = Index_data()
def get_high_low_df(self,code='KQ11',period=20): #df 중 close 활용
try:
df = self.sd.get_ohlcv_df(code, period)
except:
df = self.id.get_ohlcv_df(code,period)
df[f'{period}_high'] = df['high'].rolling(window=period, min_periods=1).max() # min period 안 넣으면 안되네...왜지???
df[f'{period}_low'] = df['low'].rolling(window=period, min_periods=1).min() # min period 안 넣으면 안되네...왜지???
print(df)
return df
def get_last_high_low_price(self, code, period): #이평 마지막 값 추출
df=self.get_high_low_df(code,period)
high_value = df[f'{period}_high'].iloc[-2]
low_value = df[f'{period}_low'].iloc[-2]
return high_value,low_value
def get_last_high_price(self, code, period): #이평 마지막 값 추출
df=self.get_high_low_df(code,period)
high_price = df[f'{period}_high'].iloc[-2] # 당일껄로 가져와야한다.
low_price = df[f'{period}_low'].iloc[-2]
return high_price
def get_last_low_price(self, code, period): #이평 마지막 값 추출
df=self.get_high_low_df(code,period)
high_price = df[f'{period}_high'].iloc[-2]
low_price = df[f'{period}_low'].iloc[-2]
return low_price
class Today_stocks_from_DM: #듀얼모멘텀으로 구한 오늘의 종목
def __init__(self):
pass
def get_today_stock(self):
subprocess.call(r'"C:\Users\njpop\PycharmProjects\Stock_Analysis_py38_64\자동시작 bat 파일\today_stocks.bat"')
now = dt.datetime.now()
y = now.year
m = now.month
d = now.day
purchase_list = pd.read_csv(f'today_stocks_{y}_{m}_{d}.csv', dtype=object,
encoding='cp949') # 코드 6자리로 만들려고 타입지정
hk.send_message(f'==============================')
hk.send_message(f'[오늘의 종목]')
hk.send_message(f'{purchase_list}')
hk.send_message(f'==============================')
purchase_list = purchase_list.astype(str)
purchase_list = purchase_list['code'].tolist() # stock analysis에서 만들어진 듀얼모멘텀 종목, 데이터프레임 → 리스트형으로
purchase_list_a_form = []
for list in purchase_list:
temp = 'A' + str(list)
purchase_list_a_form.append(temp)
print(f'purchase 리스트 : {purchase_list_a_form}')
return purchase_list_a_form
if __name__ == '__main__':
# pd.options.display.max_rows = None
# pd.options.display.max_columns = None
#
sd = Stock_data()
ma = Ma()
atr = ATR()
mt = Market_timing()
hl= High_Low()
bb=Bb()
naver_c = Naver()
pykrx_c =Pykrx()
print(sd.get_ohlcv_df("520057",5))
print(sd.get_current_price("520057"))
To embed this project on your website, copy the following code and paste it into your website's HTML: