
爬虫被网站封 IP 是家常便饭。每次被封就去手动换 IP 效率太低正确的做法是搭建一个代理 IP 池——自动采集、自动验证、自动切换。一、代理池的整体架构代理来源采集/购买 ↓ 存入代理池Redis/数据库 ↓ 定时验证检测可用性、延迟、匿名度 ↓ API 接口爬虫按需获取可用代理 ↓ 爬虫使用被封后自动标记并换下一个二、代理来源1. 采集免费代理importrequestsfrombs4importBeautifulSoupimporttimedeffetch_89ip():从 89IP 采集免费代理proxies[]urlhttps://www.89ip.com/try:resprequests.get(url,timeout10)soupBeautifulSoup(resp.text,html.parser)rowssoup.select(table tr)[1:]# 跳过表头forrowinrows:cellsrow.select(td)iflen(cells)2:ipcells[0].text.strip()portcells[1].text.strip()proxies.append(f{ip}:{port})exceptExceptionase:print(f采集失败:{e})returnproxies免费代理的特点90% 不可用需要验证速度慢平均 3-5 秒匿名度低适合学习和小规模爬虫2. 购买付费代理classProxyProvider:代理服务商 API 提取def__init__(self,api_url):self.api_urlapi_urldeffetch(self):从 API 提取代理try:resprequests.get(self.api_url,timeout10)ifresp.status_code200:dataresp.json()return[f{item[ip]}:{item[port]}foritemindata.get(data,[])]exceptExceptionase:print(f提取失败:{e})return[]三、代理池核心实现1. 存储层RedisimportredisimportjsonclassProxyRedis:代理池 Redis 存储def__init__(self,hostlocalhost,port6379,db0):self.clientredis.StrictRedis(hosthost,portport,dbdb)self.keyproxy_pooldefadd(self,proxy,score10):添加代理分数越高越优先使用self.client.zadd(self.key,{proxy:score})defget(self):获取一个可用代理分数最高的resultself.client.zrevrange(self.key,0,0)returnresult[0].decode()ifresultelseNonedefget_all(self):获取所有代理return[p.decode()forpinself.client.zrange(self.key,0,-1)]defremove(self,proxy):移除不可用的代理self.client.zrem(self.key,proxy)defdecrease(self,proxy):降低代理分数连续失败后移除scoreself.client.zscore(self.key,proxy)ifscoreandscore1:self.client.zincrby(self.key,-1,proxy)else:self.remove(proxy)defcount(self):代理数量returnself.client.zcard(self.key)2. 验证层importthreadingimporttimeimportrequestsclassProxyValidator:代理验证器# 测试用 URL建议用稳定的网站TEST_URLS[http://httpbin.org/ip,https://www.baidu.com,]def__init__(self,proxy_redis,timeout5):self.proxy_redisproxy_redis self.timeouttimeoutdefvalidate(self,proxy):验证单个代理是否可用fortest_urlinself.TEST_URLS:try:resprequests.get(test_url,proxies{http:proxy,https:proxy},timeoutself.timeout,)ifresp.status_code200:returnTrueexcept:passreturnFalsedefvalidate_all(self):验证所有代理低分优先验证高分的减少验证频率proxiesself.proxy_redis.get_all()forproxyinproxies:ifself.validate(proxy):print(f✅{proxy}可用)else:self.proxy_redis.decrease(proxy)print(f❌{proxy}不可用已扣分)3. 调度层importscheduleimportthreadingclassProxyPoolScheduler:代理池调度器def__init__(self,proxy_redis,validator,providersNone):self.proxy_redisproxy_redis self.validatorvalidator self.providersprovidersor[]defcollect(self):定时采集print(开始采集代理...)count0forproviderinself.providers:proxiesprovider.fetch()forproxyinproxies:self.proxy_redis.add(proxy)count1print(f采集完成新增{count}个代理)defcheck(self):定时验证print(f开始验证当前代理数:{self.proxy_redis.count()}...)self.validator.validate_all()print(f验证完成剩余代理:{self.proxy_redis.count()})defstart(self):启动定时任务# 每 30 分钟采集一次schedule.every(30).minutes.do(self.collect)# 每 10 分钟验证一次schedule.every(10).minutes.do(self.check)# 先执行一次self.collect()self.check()whileTrue:schedule.run_pending()time.sleep(30)defstart_async(self):异步启动不阻塞主线程threadthreading.Thread(targetself.start,daemonTrue)thread.start()print(代理池调度器已启动后台运行)四、API 接口fromflaskimportFlask,jsonifyimportredis appFlask(__name__)proxy_redisProxyRedis()app.route(/proxy)defget_proxy():获取一个代理proxyproxy_redis.get()ifproxy:returnjsonify({code:200,proxy:proxy})returnjsonify({code:404,message:代理池为空})app.route(/proxy/list)deflist_proxies():获取所有代理proxiesproxy_redis.get_all()returnjsonify({code:200,count:len(proxies),data:proxies})app.route(/proxy/count)defproxy_count():代理数量returnjsonify({code:200,count:proxy_redis.count()})if__name____main__:app.run(port5010)五、在爬虫中使用importrequestsimporttimeclassProxyCrawler:使用代理池的爬虫def__init__(self,proxy_apihttp://localhost:5010/proxy):self.proxy_apiproxy_apidefget_proxy(self):从代理池获取代理try:resprequests.get(self.proxy_api,timeout3)dataresp.json()ifdata[code]200:returndata[proxy]except:passreturnNonedefrequest(self,url,max_retries5):带代理重试的请求foriinrange(max_retries):proxyself.get_proxy()ifnotproxy:print(代理池为空等待...)time.sleep(5)continuetry:resprequests.get(url,proxies{http:proxy,https:proxy},timeout10,headers{User-Agent:Mozilla/5.0},)ifresp.status_code200:returnrespexcept:print(f代理{proxy}不可用换下一个...)returnNonedefcrawl(self,urls):批量爬取forurlinurls:respself.request(url)ifresp:print(f✅{url}爬取成功)else:print(f❌{url}爬取失败)time.sleep(1)# 使用crawlerProxyCrawler()crawler.crawl([https://example.com/page/1,https://example.com/page/2])六、完整搭建流程# 一键启动代理池if__name____main__:# 1. 初始化 Redis 存储redis_storeProxyRedis()# 2. 初始化验证器validatorProxyValidator(redis_store)# 3. 配置代理来源providers[ProxyProvider(https://api.example.com/get_proxy),]# 4. 启动调度器后台运行schedulerProxyPoolScheduler(redis_store,validator,providers)scheduler.start_async()# 5. 启动 API 服务app.run(port5010)七、付费代理说明免费代理只适合学习和测试正式项目建议购买付费代理API。一个月几十块钱省去采集验证的麻烦可用率和速度都有保障。选择时关注以下几点指标说明可用率最好 85% 以上速度平均响应时间 3 秒以内匿名度高匿名不暴露真实 IP协议支持 HTTP/HTTPS提取方式API 接口提取按量或按时付费 觉得有用的话点赞 关注【张老师技术栈】吧每周更新 Java/Python/爬虫 实战干货不让你白来。