C# 示例
using System;
using System.Net;
using System.Net.Http;
using System.Collections.Generic;
namespace ClientProxyDemo
{
class TestProxy
{
static void Main(string[] args)
{
String proxyServer = "http://xxx:xxx"; // http://host:port, 例(http://1.2.3.4:7777), host可以是域名或者ip,port是代理端口号
String username = "xxx"; // 代理用户名
String password = "xxx"; // 代理密码
var proxy = new WebProxy(proxyServer){
Credentials = new NetworkCredential(username, password)
};
HttpClientHandler httpClientHandler = new HttpClientHandler()
{
Proxy = proxy
};
var httpCient = new HttpClient(httpClientHandler);
// 增加头部
httpCient.DefaultRequestHeaders.Add("Header-Key", "header-vaule");
TestProxy testProxy = new TestProxy();
testProxy.testGet(httpCient);
// testProxy.testPost(httpCient);
}
// 测试get请求
public void testGet(HttpClient httpClient)
{
String targetUrl = "http://httpbin.org/get";
var httpResult = httpClient.GetStringAsync(targetUrl).Result;
}
// 测试post请求
public void testPost(HttpClient httpClient)
{
String targetUrl = "http://httpbin.org/post";
List<KeyValuePair<string, string>> formData = new List<KeyValuePair<string, string>>();
formData.Add(new KeyValuePair<string, string>("key1", "vaule1"));
formData.Add(new KeyValuePair<string, string>("key2", "vaule2"));
var formContent = new FormUrlEncodedContent(formData.ToArray());
var responseMsg = httpClient.PostAsync(targetUrl, formContent).Result;
var httpResult = responseMsg.Content.ReadAsStringAsync().Result;
}
}
}
NodeJS 示例
//node代理请求http
var http = require('http')
var opt = {
host: 'tunnel.alicloudecs.com',//这里放代理服务器的ip或者域名
port: '500',//这里放代理服务器的端口号
auth: '用户名:密码',//这里是代理服务器的认证信息(用户名:密码)
method: 'GET',//这里是发送的方法
path: 'http://myip.top', //这里是访问的路径
}
//以下是接收数据的代码
var body = '';
var req = http.request(opt, function (res) {
console.log("Got response: " + res.statusCode);
res.on('data', function (d) {
body += d;
}).on('end', function () {
console.log(res.headers)
console.log(body)
});
}).on('error', function (e) {
console.log("Got error: " + e.message);
})
req.end();
//node代理请求http/https
const fetch = require("node-fetch");
const HttpsProxyAgent = require('https-proxy-agent');
//这里是访问的路径
let url="https://www.baidu.com/s?cl=3&tn=baidutop10&fr=top1000&wd=%E9%AB%98%E8%80%83&rsv_idx=2&rsv_dl=fyb_n_homepage&hisfilter=1";
//这是代理服务器地址
let ip='58.218.205.48';
let username = 'your_username';
let password = 'your_password';
let options = {
auth: username + ':' + password
};
//这是端口号
let port='500';
fetch(url, {
method: 'GET',
// body: null,
agent: new HttpsProxyAgent("http://" + ip + ":" + port, options) //<==注意是 `http://`
}).then(function (res) {
console.log("Response Headers ============ ");
res.headers.forEach(function(v,i,a) {
console.log(i+" : "+v);
});
return res.text();
}).then(function (res) {
console.log("Response Body ============ ");
console.log(res);
});
Go 示例
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
)
// sock5代理
func socksproxy() {
urli := url.URL{}
urlproxy, _ := urli.Parse("http://用户名:密码@测试ip:端口")
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(urlproxy),
},
}
rqt, err := http.NewRequest("GET", "http://myip.top", nil)
if err != nil {
println("接口获取IP失败!")
return
}
//处理返回结果
response, _ := client.Do(rqt)
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return
}
fmt.Println("socks5:", string(body))
return
}
// http代理
func httpproxy() {
urli := url.URL{}
urlproxy, _ := urli.Parse("http://用户名:密码@测试ip:端口")
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(urlproxy),
},
}
rqt, err := http.NewRequest("GET", "http://myip.top", nil)
if err != nil {
println("接口获取IP失败!")
return
}
//处理返回结果
response, _ := client.Do(rqt)
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return
}
fmt.Println("http:", string(body))
return
}
// 本机IP
func httplocal() {
client := &http.Client{}
rqt, err := http.NewRequest("GET", "http://myip.top", nil)
if err != nil {
println("接口获取IP失败!")
return
}
//处理返回结果
response, _ := client.Do(rqt)
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return
}
fmt.Println("本机:", string(body))
return
}
func main() {
httplocal()
httpproxy()
socksproxy()
}
PHP curl 模块示例
<?php
//Php http/sock5:
// 要访问的目标页面
$targetUrl = "http://baidu.com";
// 代理服务器
$proxyServer = "username:password@proxy.example.com:8080";
// 隧道身份信息
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $targetUrl);
curl_setopt($ch, CURLOPT_HTTPPROXYTUNNEL, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
// 设置代理服务器
// curl_setopt($ch, CURLOPT_PROXYTYPE, 0); //http
curl_setopt($ch, CURLOPT_PROXYTYPE, 5); //sock5
curl_setopt($ch, CURLOPT_PROXY, $proxyServer);
// 设置隧道验证信息
curl_setopt($ch, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
curl_setopt($ch, CURLOPT_USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727;)");
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 3);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_HEADER, true);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$result = curl_exec($ch);
curl_close($ch);
var_dump($result);
Python 示例
Python Requests 使用示例
#coding=utf-8
import requests
#请求地址
targetUrl = "https://myip.top"
#代理服务器
proxyHost = "1.1.1.1" #代理IP
proxyPort = "1234" #代理端口
proxyUser = "1234" #代理账户
proxyPass = "123456" #代理密码
# Python2
#proxyMeta = "http://%(user)s:%(password)s@%(host)s:%(port)s" % {
# "user" : proxyUser,
# "passworrd" : proxyPass,
# "host" : proxyHost,
# "port" : proxyPort,
#}
# Python3
proxyMeta = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"
proxies = {
"http" : proxyMeta,
"https" : proxyMeta
}
resp = requests.get(targetUrl, proxies=proxies)
print("Use proxies:", proxies)
print("Respose status code: ", resp.status_code)
print("Respose content: ", resp.text)
Python aiohttp 模块示例
import asyncio
import json
import aiohttp
from aiohttp_socks import ProxyConnector
proxy_method = "http"
proxy_username = "test"
proxy_password = "test"
proxy_host = "1.1.1.1"
proxy_port = "8080"
async def verify(_) -> str:
timeout = aiohttp.ClientTimeout(total=5)
proxyConn = ProxyConnector.from_url(f"{proxy_method}://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}")
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.1 Safari/605.1.15",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Connection": "close"
}
async with aiohttp.ClientSession(timeout=timeout, connector=proxyConn, headers=headers) as session:
try:
async with session.get(url='https://myip.top', ssl=False) as response:
result = await response.text()
result = json.loads(result)
ip = result["ip"]
if ip:
print("ip", ip, result["city"], result["isp"])
return ip
else:
return ""
await session.close() # 使用随机出口隧道时,必须指定session关闭,否则会导致连接数过多,导致出口IP相同
except Exception as e:
print("error:", e)
return ""
async def verify_s(sem, proxy):
async with sem:
return await verify(proxy)
async def run():
task_num = 100
sem = asyncio.Semaphore(5)
task = [verify_s(sem, _) for _ in range(task_num)]
result = await asyncio.gather(*task)
ok = 0
ip = []
for res in result:
if res:
ok += 1
ip.append(res)
print(f'总测试数量: {task_num} 成功: {ok} 成功率: {int(ok / task_num * 100)}%')
print(f"总请求数:{task_num} 去重IP数: {len(set(ip))}")
if __name__ == '__main__':
asyncio.run(run())
Python seleniume 模块示例
#Python selenium http/socks5:
#coding=utf-8
from selenium import webdriver
# 代理服务器
proxyHost = "ip"
proxyPort = "port"
proxyUser = "username"
proxyPass = "password"
proxyType='http' #socks5
# 代理隧道验证信息
service_args = [
"--proxy-type=%s" % proxyType,
"--proxy=%(user)s:%(pass)s@%(host)s:%(port)s" % {
"user" : proxyUser,
"pass" : proxyPass,
"host" : proxyHost,
"port" : proxyPort,
}
]
# 要访问的目标页面
targetUrl = "http://baidu.com"
driver = webdriver.PhantomJS(service_args=service_args)
driver.get(targetUrl)
print driver.title
print driver.page_source.encode("utf-8")
driver.quit()
更新时间:2023-12-21 10:42