-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathchan_lun_util.py
447 lines (366 loc) · 16.3 KB
/
chan_lun_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# -*- coding: utf-8 -*-
from datetime import datetime
# from merge_line_dto import MergeLineDTO
# from k_line_dto import KLineDTO
# from sql_lite_util import *
__author__ = 'imspzero'
# K线DTO
class KLineDTO(object):
day = datetime.now()
begin_time = datetime.now()
end_time = datetime.now()
open = 0.0
high = 0.0
low = 0.0
close = 0.0
def __init__(self, day, begin_time, end_time, open, high, low, close):
self.day = day
self.begin_time = begin_time
self.end_time = end_time
self.open = open
self.high = high
self.low = low
self.close = close
def __str__(self):
return "(" + self.day.strftime('%Y-%m-%d %H:%M:%S') + ", " \
+ self.begin_time.strftime('%Y-%m-%d %H:%M:%S') + ", " \
+ self.end_time.strftime('%Y-%m-%d %H:%M:%S') + ")"
# --------------------------------
# 合并后的K线DTO
class MergeLineDTO(object):
memberList = []
stick_num = 0
begin_time = datetime.now()
end_time = datetime.now()
high = 0.0
low = 0.0
is_peak = 'N'
is_bottom = 'N'
member_list = [] # KLineDTO[]
def __init__(self, stick_num, begin_time, end_time, high, low, is_peak, is_bottom):
self.stick_num = stick_num
self.begin_time = begin_time
self.end_time = end_time
self.high = high
self.low = low
self.is_peak = is_peak
self.is_bottom = is_bottom
def __str__(self):
return "(" + self.begin_time.strftime('%Y-%m-%d %H:%M:%S') + ", " \
+ self.end_time.strftime('%Y-%m-%d %H:%M:%S') + ")"
# --------------------------------
def set_peak_and_bottom_flag(merge_line_list):
# 标记顶和底
# []MergeLineDTO
if len(merge_line_list) < 3:
return
# 顶和底,暂不考虑是否公用k线
i = 1
while i < len(merge_line_list) - 1:
first_dto = merge_line_list[i - 1]
middle_dto = merge_line_list[i]
last_dto = merge_line_list[i + 1]
if middle_dto.high > max(first_dto.high, last_dto.high) \
and middle_dto.low > max(first_dto.low, last_dto.low):
middle_dto.is_peak = 'Y'
if middle_dto.high < min(first_dto.high, last_dto.high) \
and middle_dto.low < min(first_dto.low, last_dto.low):
middle_dto.is_bottom = 'Y'
i += 1
def is_inclusive(merge_line_dto, high_price, low_price):
# 判断是否存在包含关系
# MergeLineDTO, float, float
if (merge_line_dto.high >= high_price and merge_line_dto.low <= low_price) \
or (merge_line_dto.high <= high_price and merge_line_dto.low >= low_price):
return True
return False
def is_down(merge_line_dto, high_price, low_price):
# 是否方向向下
# MergeLineDTO, float, float
if merge_line_dto.high > high_price and merge_line_dto.low > low_price:
return True
return False
def is_up(merge_line_dto, high_price, low_price):
# 是否方向向上
# MergeLineDTO, float, float
if merge_line_dto.high < high_price and merge_line_dto.low < low_price:
return True
return False
def merge_k_line(merge_line_dto, k_line_dto, trend):
# 合并K线
# MergeLineDTO, KLineDTO, string
if trend == 'up':
merge_line_dto.high = max(merge_line_dto.high, k_line_dto.high)
merge_line_dto.low = max(merge_line_dto.low, k_line_dto.low)
merge_line_dto.end_time = k_line_dto.end_time
if trend == 'down':
merge_line_dto.high = min(merge_line_dto.high, k_line_dto.high)
merge_line_dto.low = min(merge_line_dto.low, k_line_dto.low)
merge_line_dto.end_time = k_line_dto.end_time
merge_line_dto.end_time = k_line_dto.end_time
merge_line_dto.member_list.append(k_line_dto)
merge_line_dto.stick_num += 1
def find_peak_and_bottom(k_line_list, begin_trend): # []KLineDTO
# 寻找真正的顶和底
# []KLineDTO, string
k_line_dto = k_line_list[0]
# init for the first mergeLine
merge_line_dto = MergeLineDTO(1, k_line_dto.begin_time, k_line_dto.end_time,
k_line_dto.high, k_line_dto.low, 'N', 'N')
merge_line_dto.member_list = []
merge_line_dto.member_list.append(k_line_dto)
# new merge_line_list, and this is the return result
merge_line_list = [merge_line_dto]
trend = begin_trend
i = 1
while i < len(k_line_list):
today_k_line_dto = k_line_list[i]
last_m_line_dto = merge_line_list[len(merge_line_list) - 1]
if is_inclusive(last_m_line_dto, today_k_line_dto.high, today_k_line_dto.low):
# 假如存在包含关系,合并K线
merge_k_line(last_m_line_dto, today_k_line_dto, trend)
else:
if is_up(last_m_line_dto, today_k_line_dto.high, today_k_line_dto.low):
trend = "up"
elif is_down(last_m_line_dto, today_k_line_dto.high, today_k_line_dto.low):
trend = "down"
this_mline_dto = MergeLineDTO(1, today_k_line_dto.begin_time, today_k_line_dto.end_time,
today_k_line_dto.high, today_k_line_dto.low, 'N', 'N')
this_mline_dto.member_list = []
this_mline_dto.member_list.append(today_k_line_dto)
merge_line_list.append(this_mline_dto)
# 处理顶底分型
set_peak_and_bottom_flag(merge_line_list)
i += 1
# print(merge_line_list)
# 输出检查
'''
for m_line_dto in merge_line_list:
print(m_line_dto.begin_time.strftime('%Y-%m-%d %H:%M:%S') + " -- " +
m_line_dto.end_time.strftime('%Y-%m-%d %H:%M:%S') + "**" +
m_line_dto.is_peak + "**" + m_line_dto.is_bottom + "**" +
str(m_line_dto.stick_num))
'''
return merge_line_list
def fen_bi(merge_line_list):
# 分笔
# MergeLineDTO[]
point_flag_list = [False] * len(merge_line_list)
# print(str(point_flag_list[0])+" " + str(point_flag_list[4]))
# 1.预处理,处理后相邻的都是非同一种分型
last_point_m_line_dto = None
for index in range(len(merge_line_list)):
this_m_line_dto = merge_line_list[index]
if this_m_line_dto.is_peak == 'N' \
and this_m_line_dto.is_bottom == 'N':
continue
if last_point_m_line_dto is None:
last_point_m_line_dto = this_m_line_dto
point_flag_list[index] = True
continue
if last_point_m_line_dto.is_peak == 'Y' and this_m_line_dto.is_peak == 'Y':
# 同为顶,取最高的
if this_m_line_dto.high >= last_point_m_line_dto.high:
last_point_m_line_dto = this_m_line_dto
point_flag_list[index] = True
elif last_point_m_line_dto.is_bottom == 'Y' and this_m_line_dto.is_bottom == 'Y':
# 同为底,取最低的
if this_m_line_dto.low <= last_point_m_line_dto.low:
last_point_m_line_dto = this_m_line_dto
point_flag_list[index] = True
else:
last_point_m_line_dto = this_m_line_dto
point_flag_list[index] = True
# 2.动态规划,找出任意[k,k+i]是否能成一笔
point_index_list = []
for index in range(len(point_flag_list)):
if point_flag_list[index]:
point_index_list.append(index)
point_index_matrix = [[False] * len(point_index_list) for i in range(len(point_index_list))]
# print(str(point_index_matrix[0][0]) + " " +
# str(point_index_matrix[len(point_index_list)-1][len(point_index_list)-1]))
find_valid_point_by_dp(merge_line_list, point_index_list, point_index_matrix)
# 3.根据上一步得到的结果,得出最后合理的分型
result_array = [False] * len(point_index_list)
has_result = False
index = len(point_index_list) - 1
while index > 0:
for result in result_array:
result = False
has_result = check_final_fen_bi(merge_line_list, point_index_list,
point_index_matrix, result_array, index)
if has_result:
break
index -= 1
# 4.输出分笔结果
print("分笔结果 : " + str(has_result))
if not has_result:
print("按照目前的划分规则,没有找到分笔的结果.")
for i in range(len(result_array)):
if result_array[i]:
m_line_dto = merge_line_list[point_index_list[i]]
if m_line_dto.is_peak == 'Y':
print(m_line_dto.begin_time.strftime('%Y-%m-%d %H:%M:%S') + "\t" +
m_line_dto.end_time.strftime('%Y-%m-%d %H:%M:%S') + "\t" +
"合并[" + str(m_line_dto.stick_num) + "]条K线" + "\t" +
"顶[" + str(m_line_dto.low) + "][" + str(m_line_dto.high) + "]")
if m_line_dto.is_bottom == 'Y':
print(m_line_dto.begin_time.strftime('%Y-%m-%d %H:%M:%S') + "\t" +
m_line_dto.end_time.strftime('%Y-%m-%d %H:%M:%S') + "\t" +
"合并[" + str(m_line_dto.stick_num) + "]条K线" + "\t" +
"底[" + str(m_line_dto.low) + "][" + str(m_line_dto.high) + "]")
return has_result, result_array, point_index_list
def check_final_fen_bi(merge_line_list, point_index_list,
point_index_matrix, result_array, end_index):
# 查看最终正确的笔划分
# MergeLineDTO[], boolean[], boolean[][],boolean[],integer
return search_final_fen_bi(merge_line_list, point_index_list,
point_index_matrix, result_array,
0, end_index)
def search_final_fen_bi(merge_line_list, point_index_list,
point_index_matrix, result_array, index, end_index):
# 递归查找查看最终正确的笔划分
# MergeLineDTO[], boolean[], boolean[][],boolean[],integer,integer
if index == end_index:
return True
else:
i = index + 1
while i <= end_index:
if point_index_matrix[index][i]:
result_array[index] = True
result_array[i] = True
if search_final_fen_bi(merge_line_list, point_index_list,
point_index_matrix, result_array,
i, end_index):
return True
else:
result_array[index] = False
result_array[i] = False
i += 1
return False
def find_valid_point_by_dp(merge_line_list, point_index_list, point_index_matrix):
# 通过动态规划寻找有效的分型
# 通过动态规划,查找局部解,结果在pointIndexMatrix中
# MergeLineDTO[], boolean[], boolean[][]
distance = 1
while distance < len(point_index_list):
# 取奇数,经过之前第一阶段的预处理,间隔为偶数的都是同类分型
process_by_distance(merge_line_list, point_index_list, point_index_matrix, distance)
distance += 2
def process_by_distance(merge_line_list, point_index_list, point_index_matrix, distance):
# 处理间隔为dist的分型组合,是否能成一笔
# MergeLineDTO[], boolean[], boolean[][], integer
index = 0
while index < len(point_index_list) - distance:
check_result = check_2point_is_multi_line(merge_line_list,
point_index_list,
point_index_matrix,
index, index + distance)
if check_result:
point_index_matrix[index][index + distance] = False
else:
if validate_peak_and_bottom(merge_line_list,
point_index_list[index],
point_index_list[index + distance]):
point_index_matrix[index][index + distance] = True
else:
point_index_matrix[index][index + distance] = False
index += 1
def check_2point_is_multi_line(merge_line_list, point_index_list,
point_index_matrix, start_index, end_index):
# 递归处理,逐步检查[startIndex,endIndex]是否能划分成多笔
# MergeLineDTO[], boolean[], boolean[][], integer, integer
if start_index == end_index:
return True
else:
index = start_index + 1
while index <= end_index:
if point_index_matrix[start_index][index]:
boolean_result = check_2point_is_multi_line(merge_line_list,
point_index_list,
point_index_matrix,
index, end_index)
if boolean_result:
return True
index += 1
return False
def validate_peak_and_bottom(merge_line_list, start_index, end_index):
# 校验是否满足一笔
# MergeLineDTO[], integer, integer
start_m_line_dto = merge_line_list[start_index]
end_m_line_dto = merge_line_list[end_index]
# 1.不满足顶必须接着底、或底必须接着顶
if start_index == 0:
if end_m_line_dto.is_peak != 'N' and end_m_line_dto.is_bottom != 'N':
return False
elif (start_m_line_dto.is_peak == 'Y' and end_m_line_dto.is_peak == 'Y') \
or (start_m_line_dto.is_bottom == 'Y' and end_m_line_dto.is_bottom == 'Y'):
return False
# 2.顶分型与底分型经过包含处理后,不允许共用K线
if end_index - start_index < 3:
return False
# 3.顶分型中最高K线和底分型的最低K线之间(不包括这两K线),不考虑包含关系,至少有3根(包括3根)以上K线
k_line_number = 0
for index in range(len(start_m_line_dto.member_list)):
k_line_dto = start_m_line_dto.member_list[index]
if start_m_line_dto.is_peak == 'Y' and start_m_line_dto.high == k_line_dto.high:
# 顶,并且是顶元素
k_line_number += (len(start_m_line_dto.member_list) - index - 1)
break
elif start_m_line_dto.is_bottom == 'Y' and start_m_line_dto.low == k_line_dto.low:
# 底,并且是底元素
k_line_number += (len(start_m_line_dto.member_list) - index - 1)
break
for index in range(len(end_m_line_dto.member_list)):
k_line_dto = end_m_line_dto.member_list[index]
if end_m_line_dto.is_bottom == 'Y' and end_m_line_dto.low == k_line_dto.low:
k_line_number += index
break
elif end_m_line_dto.is_peak == 'Y' and end_m_line_dto.high == k_line_dto.high:
k_line_number += index
break
# 分型中间元素的k线合计
index = start_index + 1
while index < end_index:
m_line_dto = merge_line_list[index]
k_line_number += m_line_dto.stick_num
index += 1
if k_line_number < 3:
return False
# 4.顶底分别是笔中(包括组成分型的元素)的最高和最低
peak_dto = None
bottom_dto = None
if start_m_line_dto.is_peak == 'Y':
peak_dto = start_m_line_dto
bottom_dto = end_m_line_dto
else:
peak_dto = end_m_line_dto
bottom_dto = start_m_line_dto
index = 0 if start_index == 0 else start_index - 1
while index <= end_index + 1:
m_line_dto = merge_line_list[index]
# 存在更高的点位
if m_line_dto.high > peak_dto.high \
and m_line_dto.begin_time != peak_dto.begin_time:
return False
if m_line_dto.low < bottom_dto.low \
and m_line_dto.begin_time != bottom_dto.begin_time:
return False
index += 1
# 5.不允许中间有其他的顶和底
# 6.或许还需要判断顶底的区间是否不能重叠
return True
'''
def run_test():
# 测试
# 取原始数据
# k_line_list = get_stock_30min_data_by_time("601318", "2015-12-21 10:00:00", "2016-04-02 13:30:00")
k_line_list = get_stock_week_data_by_time("999999", "2009-07-17", "2015-05-15")
print(len(k_line_list))
# k_line_list = get_stock_30min_data_by_time("999999", "2015-03-10", "2015-05-16")
print("---------------------------")
# 分型
merge_line_list = find_peak_and_bottom(k_line_list, "down")
fen_bi(merge_line_list)
'''
# run_test()