神经网络的逻辑应该都是熟知的了,在这里想说明一下交叉验证
交叉验证方法:
看图大概就能理解了,大致就是先将数据集分成K份,对这K份中每一份都取不一样的比例数据进行训练和测试。得出K个误差,将这K个误差平均得到最终误差
这第一个部分是BP神经网络的建立
参数选取参照论文:基于数据挖掘技术的股价指数分析与预测研究_胡林林
import math import random import tushare as ts import pandas as pd random.seed(0) def getData(id,start,end): df = ts.get_hist_data(id,start,end) DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r']) P1 = pd.DataFrame(columns=['high','low','close','open','volume']) DATA2=pd.DataFrame(columns=['R']) DATA['MA20']=df['ma20'] DATA['MA5']=df['ma5'] P=df['close'] P1['high']=df['high'] P1['low']=df['low'] P1['close']=df['close'] P1['open']=df['open'] P1['volume']=df['volume'] DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1) DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2) DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3) DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1)) DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2)) DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3)) DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3) DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3) DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3) templist=(P-P.shift(1))/P.shift(1) tempDATA = [] for indextemp in templist: tempDATA.append(1/(1+math.exp(-indextemp*100))) DATA['r'] = tempDATA DATA=DATA.dropna(axis=0) DATA2['R']=DATA['r'] del DATA['r'] DATA=DATA.T DATA2=DATA2.T DATAlist=DATA.to_dict("list") result = [] for key in DATAlist: result.append(DATAlist[key]) DATAlist2=DATA2.to_dict("list") result2 = [] for key in DATAlist2: result2.append(DATAlist2[key]) return result def getDataR(id,start,end): df = ts.get_hist_data(id,start,end) DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r']) P1 = pd.DataFrame(columns=['high','low','close','open','volume']) DATA2=pd.DataFrame(columns=['R']) DATA['MA20']=df['ma20'].shift(1) DATA['MA5']=df['ma5'].shift(1) P=df['close'] P1['high']=df['high'] P1['low']=df['low'] P1['close']=df['close'] P1['open']=df['open'] P1['volume']=df['volume'] DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1) DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2) DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3) DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1)) DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2)) DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3)) DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3) DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3) DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3) templist=(P-P.shift(1))/P.shift(1) tempDATA = [] for indextemp in templist: tempDATA.append(1/(1+math.exp(-indextemp*100))) DATA['r'] = tempDATA DATA=DATA.dropna(axis=0) DATA2['R']=DATA['r'] del DATA['r'] DATA=DATA.T DATA2=DATA2.T DATAlist=DATA.to_dict("list") result = [] for key in DATAlist: result.append(DATAlist[key]) DATAlist2=DATA2.to_dict("list") result2 = [] for key in DATAlist2: result2.append(DATAlist2[key]) return result2 def rand(a, b): return (b - a) * random.random() + a def make_matrix(m, n, fill=0.0): mat = [] for i in range(m): mat.append([fill] * n) return mat def sigmoid(x): return 1.0 / (1.0 + math.exp(-x)) def sigmod_derivate(x): return x * (1 - x) class BPNeuralNetwork: def __init__(self): self.input_n = 0 self.hidden_n = 0 self.output_n = 0 self.input_cells = [] self.hidden_cells = [] self.output_cells = [] self.input_weights = [] self.output_weights = [] self.input_correction = [] self.output_correction = [] def setup(self, ni, nh, no): self.input_n = ni + 1 self.hidden_n = nh self.output_n = no # init cells self.input_cells = [1.0] * self.input_n self.hidden_cells = [1.0] * self.hidden_n self.output_cells = [1.0] * self.output_n # init weights self.input_weights = make_matrix(self.input_n, self.hidden_n) self.output_weights = make_matrix(self.hidden_n, self.output_n) # random activate for i in range(self.input_n): for h in range(self.hidden_n): self.input_weights[i][h] = rand(-0.2, 0.2) for h in range(self.hidden_n): for o in range(self.output_n): self.output_weights[h][o] = rand(-2.0, 2.0) # init correction matrix self.input_correction = make_matrix(self.input_n, self.hidden_n) self.output_correction = make_matrix(self.hidden_n, self.output_n) def predict(self, inputs): # activate input layer for i in range(self.input_n - 1): self.input_cells[i] = inputs[i] # activate hidden layer for j in range(self.hidden_n): total = 0.0 for i in range(self.input_n): total += self.input_cells[i] * self.input_weights[i][j] self.hidden_cells[j] = sigmoid(total) # activate output layer for k in range(self.output_n): total = 0.0 for j in range(self.hidden_n): total += self.hidden_cells[j] * self.output_weights[j][k] self.output_cells[k] = sigmoid(total) return self.output_cells[:] def back_propagate(self, case, label, learn, correct): # feed forward self.predict(case) # get output layer error output_deltas = [0.0] * self.output_n for o in range(self.output_n): error = label[o] - self.output_cells[o] output_deltas[o] = sigmod_derivate(self.output_cells[o]) * error # get hidden layer error hidden_deltas = [0.0] * self.hidden_n for h in range(self.hidden_n): error = 0.0 for o in range(self.output_n): error += output_deltas[o] * self.output_weights[h][o] hidden_deltas[h] = sigmod_derivate(self.hidden_cells[h]) * error # update output weights for h in range(self.hidden_n): for o in range(self.output_n): change = output_deltas[o] * self.hidden_cells[h] self.output_weights[h][o] += learn * change + correct * self.output_correction[h][o] self.output_correction[h][o] = change # update input weights for i in range(self.input_n): for h in range(self.hidden_n): change = hidden_deltas[h] * self.input_cells[i] self.input_weights[i][h] += learn * change + correct * self.input_correction[i][h] self.input_correction[i][h] = change # get global error error = 0.0 for o in range(len(label)): error += 0.5 * (label[o] - self.output_cells[o]) ** 2 return error def train(self, cases, labels, limit=10000, learn=0.05, correct=0.1): for i in range(limit): error = 0.0 for i in range(len(cases)): label = labels[i] case = cases[i] error += self.back_propagate(case, label, learn, correct) def test(self,id): result=getData("000001", "2015-01-05", "2015-01-09") result2=getDataR("000001", "2015-01-05", "2015-01-09") self.setup(11, 5, 1) self.train(result, result2, 10000, 0.05, 0.1) for t in resulttest: print(self.predict(t))
下面是选取14-15年数据进行训练,16年数据作为测试集,调仓周期为20个交易日,大约1个月,对上证50中的股票进行预测,选取预测的涨幅前10的股票买入,对每只股票分配一样的资金,初步运行没有问题,但就是太慢了,等哪天有空了再运行
import BPnet import tushare as ts import pandas as pd import math import xlrd import datetime as dt import time # #nn =BPnet.BPNeuralNetwork() #nn.test('000001') #for i in ts.get_sz50s()['code']: holdList=pd.DataFrame(columns=['time','id','value']) share=ts.get_sz50s()['code'] time2=ts.get_k_data('000001')['date'] newtime = time2[400:640] newcount=0 for itime in newtime: print(itime) if newcount % 20 == 0: sharelist = pd.DataFrame(columns=['time','id','value']) for ishare in share: backwardtime = time.strftime('%Y-%m-%d',time.localtime(time.mktime(time.strptime(itime,'%Y-%m-%d'))-432000*4)) trainData = BPnet.getData(ishare, '2014-05-22',itime) trainDataR = BPnet.getDataR(ishare, '2014-05-22',itime) testData = BPnet.getData(ishare, backwardtime,itime) try: print(testData) testData = testData[-1] print(testData) nn = BPnet.BPNeuralNetwork() nn.setup(11, 5, 1) nn.train(trainData, trainDataR, 10000, 0.05, 0.1) value = nn.predict(testData) newlist= pd.DataFrame({'time':itime,"id":ishare,"value":value},index=["0"]) sharelist = sharelist.append(newlist,ignore_index=True) except: pass sharelist=sharelist.sort(columns ='value',ascending=False) sharelist = sharelist[:10] holdList=holdList.append(sharelist,ignore_index=True) newcount+=1 print(holdList)
总结
以上就是本文关于神经网络python源码分享的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:
神经网络理论基础及Python实现详解
Python语言实现百度语音识别API的使用实例
Python通过matplotlib绘制动画简单实例
如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
暂无评论...
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
2024年11月26日
2024年11月26日
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]