#####################################################################################
"""
1. 공통적으로 사용되는 주가분석 등의 함수들의 모음
[변경이력]
(23.07.29) ATR class 추가_https://technical-analysis-library-in-python.readthedocs.io/en/latest/ta.html#volatility-indicators
(23.10.06) 마켓타이밍 class MT 추가
(23.10.29) stock_data, index_data 새로 만들기
(23.11.05)
    한국투자도 같이 사용하기
(23.11.15)
    etn 현재가를 못읽어 온다.
"""
######################################################################################

import pandas as pd
import datetime as dt
import subprocess
import Util_Hankook_KR_V3_devel_231110 as hk
import FinanceDataReader as fdr
import pymysql, time
import talib
import pykrx


# pd.set_option('display.max_columns', None) ## 모든 열을 출력한다
# pd.set_option('display.max_rows', None) ## 모든 열을 출력한다


class Stock_data :
    def __init__(self):
        pass
        self.hk_sd=hk.Stock_data()
        self.fdr_c = Fdr()

    # 개별주식용 #
    def get_ohlcv_df(self, code, period): #df 중 close 활용
        time.sleep(0.1)

        try:
            df = self.hk_sd.get_ohlcv_df(code,period)
        except :
            print(f'FDR로 구합니다')
            df = self.fdr_c.get_stock_ohlcv_df(code,period)

        return df

    def get_current_price(self, code): #df 중 close 활용
        time.sleep(0.1)

        try:
            current_price = self.hk_sd.get_current_price(code)
            print(current_price)

            if current_price == None or '0':
                df = self.get_ohlcv_df(code, period=5)
                current_price = df['close'].iloc[-1]
        except:
            df=self.get_ohlcv_df(code,period=5)
            current_price = df['close'].iloc[-1]

        current_price = int(current_price)

        return current_price

    def get_price_before_ndays(self, code, days=-20):  # df 중 close 활용
        time.sleep(0.1)
        period = abs(days)
        df=self.get_ohlcv_df(code,period)
        price = df['close'].iloc[days]
        return price





class Index_data :
    def __init__(self):
        pass
        self.hk_id = hk.Index_data()
        self.fdr_c = Fdr()


    # index or 미국주식용 : Adj Close 사용해야함 #
    def get_ohlcv_df(self, code, period):  # df 중 close 활용
        time.sleep(0.1)

        try :
            df = self.hk_id.get_ohlcv_df(code,period)
        except:
            df = self.fdr_c.get_index_ohlcv_df(code,period)

        return df



    def get_current_price(self, code):  # df 중 close 활용
        time.sleep(0.1)
        df = self.get_ohlcv_df(code,5)
        current_value = df['close'].iloc[-1]
        return current_value


    def get_price_before_ndays(self, code, days=-20):  # df 중 close 활용
        time.sleep(0.1)
        period = int(abs(days)*1.7)
        df = self.get_ohlcv_df(code,period)
        current_value = df['close'].iloc[days]
        return current_value




class Etf_data :
    def __init__(self):
        pass
        self.hk_id = hk.Index_data()
        self.fdr_c = Fdr()
        self.naver_c= Naver()
        self.pykrx_c = Pykrx()


    def get_etf_nav(self,code):

        try:
            etf_nav = self.naver_c.get_etf_nav(code)
        except:
            etf_nav = self.pykrx_c.get_etf_nav(code)

        return etf_nav




class Pykrx :
    def __init__(self):
        pass
        self.hk_sd=hk.Stock_data()
        self.fdr_c = Fdr()

    def get_etf_nav(self, code):  # df 중 close 활용
        time.sleep(0.3)

        now = dt.datetime.now()
        start_date = now - dt.timedelta(days=5)
        start_date_str = start_date.strftime("%Y-%m-%d")
        end_date = now
        end_date_str = now.strftime("%Y-%m-%d")
        df = pykrx.stock.get_etf_price_deviation(start_date_str, end_date_str, code)
        nav = df['NAV'][-1]

        return nav



    def get_etf_nav_gap_rate_avg(self, code,period):  # 괴리율이라 %값이다
        time.sleep(0.3)

        now = dt.datetime.now()
        start_date = now - dt.timedelta(days=int(period*1.7))
        start_date_str = start_date.strftime("%Y-%m-%d")
        end_date = now
        end_date_str = now.strftime("%Y-%m-%d")

        df = pykrx.stock.get_etf_price_deviation(start_date_str, end_date_str, code)
        print(df)

        TotalGap = 0

        for idx, row in df.iterrows():
            Gap = abs(float(row['괴리율'])) #이걸절대값으로 하는게 맞나?

            TotalGap += Gap

        GapAvg = TotalGap / len(df)

        print("GapAvg", GapAvg)

        return GapAvg




class Fdr :
    def __init__(self):
        pass

    def get_stock_ohlcv_df(self,code,period):
            time.sleep(0.3)
            period = int((period*1.5)+1)
            now = dt.datetime.now() - dt.timedelta(days=period)
            start_date = now.strftime('%Y-%m-%d')
            df = fdr.DataReader(code, start_date)
            df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
            df = df.dropna()
            df = df.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close',  'Volume': 'volume',})
            df = df[['open', 'high', 'low', 'close','volume']]
            return df


    def get_index_ohlcv_df(self,code,period):
            time.sleep(0.3)
            period = int((period * 1.5) + 1)
            now = dt.datetime.now() - dt.timedelta(days=period)
            start_date = now.strftime('%Y-%m-%d')
            df = fdr.DataReader(code, start_date)
            df = df[['Open', 'High', 'Low', 'Adj Close','Volume']]
            df = df.dropna()
            df = df.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Adj Close': 'close', 'Volume':'volume'})
            df = df[['open', 'high', 'low', 'close','volume']]
            return df




class Naver :
    def __init__(self):
        pass

    def get_etf_nav(self,code):
        time.sleep(0.3)


        url = "https://[Log in to view URL]" + code
        dfs = pd.read_html(url, encoding='euc-kr')
        # pprint.pprint(dfs)

        data_dict = dfs[8]

        '''
        data_keys = list(data_dict.keys())
        for key in data_keys:
            print("key:",key)
            print("data_dict[key]:",data_dict[key])

            Second_Key = list(data_dict[key].keys())
            for secondkey in Second_Key:
                print("secondkey:",secondkey)
                print("data_dict[key][secondkey]:", data_dict[key][secondkey])
        '''

        Nav = int(data_dict[1][0])

        return Nav

    #네이버는 1페이지라 10일체 데이터밖에 안된다. period가 의미가 없다.
    def get_etf_nav_gap_rate_avg(self,code,period):

        time.sleep(0.3)

        url = "https://[Log in to view URL]" + code
        dfs = pd.read_html(url, encoding='euc-kr')

        data_dict = dfs[4]

        data_list = data_dict["괴리율"].to_list()
        print(data_list)

        count = 0
        TotalGap = 0
        for data in data_list:
            if "%" in str(data):
                Gap = abs(float(data.replace('%', ''))) #이걸 절대값으로 하는게 맞나?
                TotalGap += Gap
                count += 1

        GapAvg = TotalGap / count

        return GapAvg




class Stock_data_using_db :

    def __init__(self):
        # DB 연결
        self.db_name = "stock_analysis"
        self.engine = pymysql.connect(host='localhost', user='root', password='njmr0623$$', db=f'{self.db_name}',
                                      charset='utf8')
        self.table_name = "daily_price_1y"  # 1년짜리 사용

        #한국투자증권용 사용
        self.hk_sd=hk.Stock_data()



    def get_stock_ohlc_df(self, code, period=20): #df 중 close 활용
        time.sleep(0.1)
        now = dt.datetime.now() - dt.timedelta(period * 2)
        start_date = now.strftime('%Y-%m-%d')

        df = pd.read_sql(f"select * from {self.table_name} where code={code} and date >= {start_date}", con=self.engine)

        #디비로 구한게 업다면 아래 Fdr로 수행 수행
        if len(df) == 0:
            print(f'FDR로 구하기')
            df = fdr.DataReader(code,start_date)
            df = df[['Open', 'High', 'Low', 'Close','Volume']]
            df = df.dropna()
            df = df.rename(columns={'Open':'open', 'High':'high', 'Low':'low', 'Close':'close', 'Volume':'volume'})

        df = df[['open','high','low','close','volume']]

        return df



    def get_current_price(self, code): #df 중 close 활용
        time.sleep(0.1)
        df=self.get_stock_ohlc_df(code)
        current_value = df['close'].iloc[-1]
        return current_value



    def get_price_before_ndays(self, code, days=-20):  # df 중 close 활용
        time.sleep(0.1)
        df=self.get_stock_ohlc_df(code)
        price = df['close'].iloc[days-1]
        return price


    def get_volume_before_ndays(self, code, days):  # df 중 close 활용
        time.sleep(0.1)
        df=self.get_stock_ohlc_df(code)
        volume = df['volume'].iloc[days]
        return volume






class Ma : #moving Average, 이동평균선
    def __init__(self):
        self.sd = Stock_data()
        self.id = Index_data()

    # 단순 이동평균 ######################
    def get_sma_df(self, code, period): #df 중 close 활용

        try:
            df = self.sd.get_ohlcv_df(code, period)
        except:
            df = self.id.get_ohlcv_df(code, period)

        df[f'{period}_ma'] = df['close'].rolling(window=period, min_periods=1).mean()  # min period 안 넣으면 안되네...왜지???

        return df

    def get_sma_value(self, code, period): #이평 마지막 값 추출
        df=self.get_sma_df(code, period)
        ma_value = df[f'{period}_ma'].iloc[-1]
        return ma_value

    # 지수 이동평균 ######################
    def get_ema_df(self,code,period): #df 중 close 활용

        try:
            df = self.sd.get_ohlcv_df(code, period)
        except:
            df = self.id.get_ohlcv_df(code, period)

        df[f'{period}_ema'] = talib.EMA(df['close'],period)

        return df

    def get_ema_value(self, code, period): #이평 마지막 값 추출
        df=self.get_ema_df(code,period)
        ema_value = df[f'{period}_ema'].iloc[-1]
        return ema_value




class Bb: #볼린저밴드
    def __init__(self):
        self.sd = Stock_data()
        self.id = Index_data()



    def get_bb_df(self, code, period, gap): #gap은 표준편차 예를들어 0.5, 1, 2 표준편차
        try:
            df = self.sd.get_ohlcv_df(code, period)
        except:
            df = self.id.get_ohlcv_df(code, period)
        df[f'{period}_ma'] = df['close'].rolling(window=period, min_periods=1).mean()  # min period 안 넣으면 안되네...왜지???
        df[f'{period}bb_stddev'] = df['close'].rolling(window=period, min_periods=1).std()  # 120일 이동표준편차
        df[f'{period}bb_upper'] = df[f'{period}_ma'] + gap * df[f'{period}bb_stddev']  # 상단밴드
        df[f'{period}bb_lower'] = df[f'{period}_ma'] - gap * df[f'{period}bb_stddev']  # 하단밴드
        return df


    def get_bb_last_upper_value(self, code, period, gap):
        df = self.get_bb_df(code, period, gap)
        upper_value = df[f'{period}bb_upper'].iloc[-1]
        return upper_value


    def get_bb_last_lower_value(self, code, period, gap):
        df=self.get_bb_df(code, period, gap)
        low_value = df[f'{period}bb_lower'].iloc[-1]
        return low_value



class Sto:  # 스토캐스틱
    def __init__(self):
        self.sd = Stock_data()
        self.id = Index_data()

    def get_fast_k_slow_k_df(self, code, period, fast_k_period, slow_k_period): # 5,3,3 / 10,6,6, / 20,12,12 이런식 사용
        try:
            df = self.sd.get_ohlcv_df(code, period)
        except:
            df = self.id.get_ohlcv_df(code, period)

        df[f'{fast_k_period}days_high'] = df['high'].rolling(window=fast_k_period, min_periods=1).max()
        df[f'{fast_k_period}days_low'] = df['low'].rolling(window=fast_k_period, min_periods=1).min()
        df[f'fask_k_{fast_k_period}'] = ((df['close'] - df[f'{fast_k_period}days_low']) / (df[f'{fast_k_period}days_high'] - df[f'{fast_k_period}days_low'])) * 100
        df[f'slow_k_{slow_k_period}'] = df[f'fask_k_{fast_k_period}'].rolling(window=slow_k_period, min_periods=1).mean()
        return df

    def get_last_fast_k(self, code, period, fast_k_period, slow_k_period):
        df = self.get_fast_k_slow_k_df(code, period, fast_k_period, slow_k_period)
        fast_k = df[f'fask_k_{fast_k_period}'].iloc[-1]
        return fast_k

    def get_last_slow_k(self, code, period, fast_k_period, slow_k_period):
        df=self.get_fast_k_slow_k_df(code, period, fast_k_period, slow_k_period)
        slow_k = df[f'slow_k_{slow_k_period}'].iloc[-1]
        return slow_k




class ATR:  # using DB
    def __init__(self):
        # DB 연결
        self.db_name = "stock_analysis"
        self.engine = pymysql.connect(host='localhost', user='root', password='njmr0623$$', db=f'{self.db_name}', charset='utf8')
        self.table_name = "daily_price_1y"

        #클래스선언
        self.sd = Stock_data()
        self.id = Index_data()


    def get_atr_df(self, code, period):  # df 중 close 활용

        try:
            now = dt.datetime.now()
            start_date = now - dt.timedelta(days=period*1.7)
            start_date = start_date.strftime('%Y-%m-%d')
            df = pd.read_sql(f"select * from {self.table_name} where code={code} and date >= {start_date}", con=self.engine)
            if len(df)==0:
                try:
                    df = self.sd.get_ohlcv_df(code, period)
                except:
                    df = self.id.get_ohlcv_df(code, period)
        except:
            try:
                df = self.sd.get_ohlcv_df(code, period)
            except:
                df = self.id.get_ohlcv_df(code, period)

        df[f'atr_{period}'] = talib.ATR(high=df['high'],low=df['low'],close=df['close'],timeperiod=period)

        return df

    def get_atr_value_ndays(self, code, period): #이평 마지막 값 추출
        df=self.get_atr_df(code,period)
        atr = df[f'atr_{period}'].iloc[-1]
        return atr




class Market_timing: #마켓타이밍
    def __init__(self):
        pass
        self.sd = Stock_data()
        self.id = Index_data()
        self.ma = Ma()
        self.atr = ATR()
        self.hl = High_Low()


    def compare_to_ndays_ma(self, code='KQ11', ma_days=5):  # 이동평균
        try:
            current_price = self.id.get_current_price(code)
        except:
            current_price=self.sd.get_current_price(code)

        ma_days_value = self.ma.get_sma_value(code, ma_days)

        if (current_price < ma_days_value) :
            buy_signal = False
        else:
            buy_signal = True
        return buy_signal

    def mt_3_5_10(self, code='KQ11'):  # df는  ohlc만
        try:
            current_price = self.id.get_current_price(code)
            if current_price == None:
                current_price = self.sd.get_current_price(code)
        except:
            current_price = self.sd.get_current_price(code)

        ma_3_value = self.ma.get_sma_value(code, 3)
        ma_5_value = self.ma.get_sma_value(code, 5)
        ma_10_value = self.ma.get_sma_value(code, 10)

        print(current_price,ma_3_value,ma_5_value,ma_10_value)

        if (current_price < ma_3_value) and (current_price < ma_5_value) and (current_price < ma_10_value):
            buy_signal = False
        else:
            buy_signal = True
        return buy_signal

    def compare_to_ndays_ago(self, code='KQ11', period=20):  # df는  ohlc만
        try:
            current_price = self.id.get_current_price(code)
            if current_price == None:
                current_price = self.sd.get_current_price(code)
        except:
            current_price = self.sd.get_current_price(code)

        ndays_ago_price = self.sd.get_price_before_ndays(code, period)

        if current_price > ndays_ago_price:
            buy_signal = True
        else:
            buy_signal = False

        return buy_signal


    def break_throgh_high_low_price_ndays_ndays(self, code='KQ11', high_period=20, low_period=10):  # df는  ohlc만
        try:
            current_price = self.id.get_current_price(code)
            if current_price == None:
                current_price = self.sd.get_current_price(code)
        except:
            current_price = self.sd.get_current_price(code)

        high_price = self.hl.get_last_high_price(code,high_period)
        low_price = self.hl.get_last_low_price(code, low_period)

        print(current_price,high_price,low_price)

        if current_price > high_price:
            signal = True
        elif current_price < low_price:
            signal = False
        else :
            signal ="nothing"

        return signal




class High_Low : #고가, 저가 구하기
    def __init__(self):
        pass
        self.sd= Stock_data()
        self.id = Index_data()

    def get_high_low_df(self,code='KQ11',period=20): #df 중 close 활용
        try:
            df = self.sd.get_ohlcv_df(code, period)
        except:
            df = self.id.get_ohlcv_df(code,period)

        df[f'{period}_high'] = df['high'].rolling(window=period, min_periods=1).max()  # min period 안 넣으면 안되네...왜지???
        df[f'{period}_low'] = df['low'].rolling(window=period, min_periods=1).min()  # min period 안 넣으면 안되네...왜지???
        print(df)
        return df

    def get_last_high_low_price(self, code, period): #이평 마지막 값 추출
        df=self.get_high_low_df(code,period)
        high_value = df[f'{period}_high'].iloc[-2]
        low_value = df[f'{period}_low'].iloc[-2]
        return high_value,low_value


    def get_last_high_price(self, code, period): #이평 마지막 값 추출
        df=self.get_high_low_df(code,period)
        high_price = df[f'{period}_high'].iloc[-2] # 당일껄로 가져와야한다.
        low_price = df[f'{period}_low'].iloc[-2]
        return high_price




    def get_last_low_price(self, code, period): #이평 마지막 값 추출
        df=self.get_high_low_df(code,period)
        high_price = df[f'{period}_high'].iloc[-2]
        low_price = df[f'{period}_low'].iloc[-2]
        return low_price





class Today_stocks_from_DM: #듀얼모멘텀으로 구한 오늘의 종목
    def __init__(self):
        pass
    def get_today_stock(self):
        subprocess.call(r'"C:\Users\njpop\PycharmProjects\Stock_Analysis_py38_64\자동시작 bat 파일\today_stocks.bat"')
        now = dt.datetime.now()
        y = now.year
        m = now.month
        d = now.day
        purchase_list = pd.read_csv(f'today_stocks_{y}_{m}_{d}.csv', dtype=object,
                                    encoding='cp949')  # 코드 6자리로 만들려고 타입지정
        hk.send_message(f'==============================')
        hk.send_message(f'[오늘의 종목]')
        hk.send_message(f'{purchase_list}')
        hk.send_message(f'==============================')
        purchase_list = purchase_list.astype(str)
        purchase_list = purchase_list['code'].tolist()  # stock analysis에서 만들어진 듀얼모멘텀 종목, 데이터프레임 → 리스트형으로
        purchase_list_a_form = []
        for list in purchase_list:
            temp = 'A' + str(list)
            purchase_list_a_form.append(temp)
        print(f'purchase 리스트 : {purchase_list_a_form}')

        return purchase_list_a_form




if __name__ == '__main__':

    # pd.options.display.max_rows = None
    # pd.options.display.max_columns = None
    #

    sd = Stock_data()
    ma = Ma()
    atr = ATR()
    mt = Market_timing()
    hl= High_Low()
    bb=Bb()
    naver_c = Naver()
    pykrx_c =Pykrx()




    print(sd.get_ohlcv_df("520057",5))
    print(sd.get_current_price("520057"))









Embed on website

To embed this project on your website, copy the following code and paste it into your website's HTML: