【注意】最后更新于 March 4, 2023,文中内容可能已过时,请谨慎使用。
1. 爬取流程
2. 准备工作
2.1 分页网页
@注意:这次爬取的网页数据是通过ajax加载,所以不能直接使用OnHtml抓取。
2.2 编写结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// 对应表中的每一tr
type StockPercentageRow struct {
StockCode string `selector:"td:nth-of-type(2)"`
StockName string `selector:"td:nth-of-type(3)"`
Percentage string `selector:"td:nth-of-type(7)"`
Quantity string `selector:"td:nth-of-type(8)"`
Amount string `selector:"td:nth-of-type(9)"`
}
// 对应整个table
type StockPercentageRowsCrawl struct {
Rows []StockPercentageRow `selector:"tr"`
FundCode string
CutOffDate string
}
|
3. 代码实现
3.1 请求流程图
3.2 抓取入口函数
文件位置:crontab/fund_stock_cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type FundStockCron struct {
}
// 声明并发等待组
var wg sync.WaitGroup
// 每次任务抓取总数量
var perTaskTotal = 50
// 记录每次任务对应的基金code
var fundCodeChannel = make(chan []string, perTaskTotal)
// 定时任务启动入口
func (c FundStockCron) Run() {
btime := time.Now().UnixMilli()
fmt.Println("基金持仓-股票定时任务准备执行....")
pageNum := 10
totalPage := int(math.Ceil(float64(perTaskTotal) / float64(pageNum)))
// 开启协程分组抓取
// 创建数据通道
var dataChan = make(chan [][]entity.FundStock, perTaskTotal/pageNum)
runWithGoroutine(dataChan, totalPage, pageNum)
// 读取通道,数据入库
saveToDb(dataChan)
fmt.Printf("基金持仓股票-定时任务执行完成,耗时:%vms\n", time.Now().UnixMilli()-btime)
}
|
3.3 开启协程分组抓取
文件位置:crontab/fund_stock_cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// 开启协程分组抓取
func runWithGoroutine(dataChan chan [][]entity.FundStock, totalPage, pageNum int) {
// 延迟关闭chan
defer close(dataChan)
defer close(fundCodeChannel)
// 开启协程抓取
wg.Add(totalPage)
for i := 1; i <= totalPage; i++ {
page := i
go func() {
// 获取对应页数的code
fundStocks, err := dao.FindNoSyncFundStockByPage(page, pageNum)
if err == nil {
var fundStockList [][]entity.FundStock
var fundCodes []string
for _, val := range fundStocks {
rows := &fund.StockPercentageRowsCrawl{}
rows.CrawlHtml(val.Code)
fundCodes = append(fundCodes, val.Code)
if len(rows.Rows) > 0 {
convertEntity := rows.ConvertEntity()
fundStockList = append(fundStockList, convertEntity)
}
}
// 数据存入通道
dataChan <- fundStockList
fundCodeChannel <- fundCodes
}
// 并发等待组减一
wg.Done()
}()
}
wg.Wait()
}
|
3.4 爬取函数(CrawlHtml
)
文件位置: service/crawl/fund/stock_crawl.go
@注意:这次爬取的网页数据是通过ajax加载,所以不能直接使用OnHtml抓取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
// 爬取信息
func (c *StockPercentageRowsCrawl) CrawlHtml(fundCode string) {
collector := colly.NewCollector(colly.UserAgent(crawl.UserAgent), colly.Async(true))
// 开启限速
err := collector.Limit(&colly.LimitRule{
DomainGlob: "*fundf10.eastmoney.*",
Delay: 500 * time.Millisecond,
RandomDelay: 500 * time.Millisecond,
Parallelism: 20,
})
collector.OnRequest(func(request *colly.Request) {
fmt.Println("url:", request.URL)
})
// 处理返回的数据
collector.OnResponse(func(response *colly.Response) {
// 替换字符串
compile := regexp.MustCompile(`var apidata=\{ content:"(.*)",arryear:`)
matchResult := compile.FindAllStringSubmatch(string(response.Body), -1)
if len(matchResult) == 0 {
return
}
htmlString := matchResult[0][1]
htmlString = strings.ReplaceAll(htmlString, "%", "")
htmlString = strings.ReplaceAll(htmlString, ",", "")
doc, err := goquery.NewDocumentFromReader(bytes.NewBuffer([]byte(htmlString)))
if err != nil {
return
}
docSelection := doc.Find("div[class='box']").First()
e := &colly.HTMLElement{
DOM: docSelection.Find("table"),
}
err = e.Unmarshal(c)
if err != nil {
global.GvaLogger.Error("爬虫解析失败", zap.String("error", err.Error()))
return
}
// 过滤header
if len(c.Rows) > 0 && c.Rows[0].StockCode == "" {
c.Rows = c.Rows[1:]
}
// 获取持仓季度时间信息
c.CutOffDate = docSelection.Find("h4 label").Eq(1).Find("font").Text()
// 补充额外信息
c.FundCode = fundCode
})
//topline=30 代表持仓前30的股票
err = collector.Visit(fmt.Sprintf("https://fundf10.eastmoney.com/FundArchivesDatas.aspx?type=jjcc&code=%s&topline=30", fundCode))
if err != nil {
global.GvaLogger.Sugar().Errorf("CrawlHtml error:%s", err)
}
collector.Wait()
}
|
3.5 数据清洗(ConvertEntity
)
文件位置: service/crawl/fund/stock_crawl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (c StockPercentageRowsCrawl) ConvertEntity() []entity.FundStock {
var fundStocks []entity.FundStock
if len(c.Rows) < 1 {
return []entity.FundStock{}
}
for _, row := range c.Rows {
item := entity.FundStock{
FundCode: c.FundCode,
StockCode: row.StockCode,
StockName: row.StockName,
CutOffDate: c.CutOffDate,
}
// 提取交易所信息
// 提取交易所信息
compile := regexp.MustCompile(`com\/([a-zA-Z]+)\d+\.html`)
stringSubMatch := compile.FindAllStringSubmatch(row.StockHref, -1)
if stringSubMatch != nil {
item.StockExchange = strings.ToUpper(stringSubMatch[0][1])
}
// 字符串转浮点型
item.Percentage, _ = strconv.ParseFloat(row.Percentage, 64)
item.Quantity, _ = strconv.ParseFloat(row.Quantity, 64)
item.Amount, _ = strconv.ParseFloat(row.Amount, 64)
fundStocks = append(fundStocks, item)
}
return fundStocks
}
|
3.6 保存入库(saveToDb
)
文件位置:crontab/fund_stock_cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// 保存入库
func saveToDb(dataChan chan [][]entity.FundStock) {
// 声明基金持仓股票实体列表
fundStockRows := []entity.FundStock{}
// 声明股票实体列表
stockRows := []entity.Stock{}
// 声明股票实体列表
checkExistKey := make(map[string]struct{}, perTaskTotal)
// 遍历
for fundStockGroup := range dataChan {
for _, fundStockList := range fundStockGroup {
for _, fundStock := range fundStockList {
stockCode := fundStock.StockCode
fundStockRows = append(fundStockRows, fundStock)
// 判断是否已经存在
if _, ok := checkExistKey[stockCode]; !ok {
stockRows = append(stockRows, entity.Stock{
Code: fundStock.StockCode,
Name: fundStock.StockName,
ExchangeCode: fundStock.StockExchange,
})
checkExistKey[stockCode] = struct{}{}
}
}
}
}
var codeList []string
for val := range fundCodeChannel {
for _, c := range val {
codeList = append(codeList, c)
}
}
// 入库
if save := global.GvaMysqlClient.Create(fundStockRows); save.Error != nil {
global.GvaLogger.Sugar().Errorf("基金持仓入库失败:%s", save.Error)
}
// 批量更新
if len(codeList) > 0 {
if up := global.GvaMysqlClient.Model(&entity.FundBasis{}).Where("`code` in ?", codeList).
Update("sync_stock", 1); up.Error != nil {
global.GvaLogger.Sugar().Errorf("信息更新失败:%s", up.Error)
}
}
if save := global.GvaMysqlClient.Create(stockRows); save.Error != nil {
global.GvaLogger.Sugar().Errorf("股票信息入库失败:%s", save.Error)
}
}
|
4. 添加定时任务
文件位置:initialize/cron.go
1
2
3
4
5
6
|
// 添加Job任务
func addJob(c *cron.Cron) {
...
// 爬取基金持仓信息信息(每天 20:30)
_, _ = c.AddJob("0 30 20 */1 * *", crontab.FundStockCron{})
}
|
5. 运行效果