C# 示例

using System;
using System.Net;
using System.Net.Http;
using System.Collections.Generic;

namespace ClientProxyDemo
{
    class TestProxy
    {
        static void Main(string[] args)
        {
            String proxyServer = "http://xxx:xxx"; // http://host:port, 例(http://1.2.3.4:7777), host可以是域名或者ip,port是代理端口号
            String username = "xxx"; // 代理用户名
            String password = "xxx"; // 代理密码
            var proxy = new WebProxy(proxyServer){
                Credentials = new NetworkCredential(username, password)
            };
            HttpClientHandler httpClientHandler = new HttpClientHandler()
            {
                Proxy = proxy
            };
            var httpCient = new HttpClient(httpClientHandler);
            // 增加头部
            httpCient.DefaultRequestHeaders.Add("Header-Key", "header-vaule");

            TestProxy testProxy = new TestProxy();
            testProxy.testGet(httpCient);
            // testProxy.testPost(httpCient);
        }


        // 测试get请求
        public void testGet(HttpClient httpClient)
        {
            String targetUrl = "http://httpbin.org/get";
            var httpResult = httpClient.GetStringAsync(targetUrl).Result;
        }

        // 测试post请求
        public void testPost(HttpClient httpClient)
        {
            String targetUrl = "http://httpbin.org/post";
            List<KeyValuePair<string, string>> formData = new List<KeyValuePair<string, string>>();
            formData.Add(new KeyValuePair<string, string>("key1", "vaule1"));
            formData.Add(new KeyValuePair<string, string>("key2", "vaule2"));
            var formContent = new FormUrlEncodedContent(formData.ToArray());
            var responseMsg = httpClient.PostAsync(targetUrl, formContent).Result;
            var httpResult = responseMsg.Content.ReadAsStringAsync().Result;
        }
    }
}

NodeJS 示例

//node代理请求http
var http = require('http')
var opt = {
    host: 'tunnel.alicloudecs.com',//这里放代理服务器的ip或者域名
    port: '500',//这里放代理服务器的端口号
    auth: '用户名:密码',//这里是代理服务器的认证信息(用户名:密码)
    method: 'GET',//这里是发送的方法
    path: 'http://myip.top',   //这里是访问的路径
}

//以下是接收数据的代码
var body = '';
var req = http.request(opt, function (res) {
    console.log("Got response: " + res.statusCode);
    res.on('data', function (d) {
        body += d;
    }).on('end', function () {
        console.log(res.headers)
        console.log(body)
    });

}).on('error', function (e) {
    console.log("Got error: " + e.message);
})
req.end();



//node代理请求http/https
const fetch = require("node-fetch");
const HttpsProxyAgent = require('https-proxy-agent');
 //这里是访问的路径
let url="https://www.baidu.com/s?cl=3&tn=baidutop10&fr=top1000&wd=%E9%AB%98%E8%80%83&rsv_idx=2&rsv_dl=fyb_n_homepage&hisfilter=1";
//这是代理服务器地址
let ip='58.218.205.48';    
let username = 'your_username';
let password = 'your_password';
let options = {
  auth: username + ':' + password
};  
//这是端口号             
let port='500';
fetch(url, { 
    method: 'GET',
    // body: null,    
    agent: new HttpsProxyAgent("http://" + ip + ":" + port, options) //<==注意是 `http://`
}).then(function (res) {
    console.log("Response Headers ============ ");
    res.headers.forEach(function(v,i,a) {
        console.log(i+" : "+v);
    });
    return res.text();
}).then(function (res) {
    console.log("Response Body ============ ");
    console.log(res);
});

Go 示例

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "net/url"
)

// sock5代理
func socksproxy() {
    urli := url.URL{}
    urlproxy, _ := urli.Parse("http://用户名:密码@测试ip:端口")
    client := &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyURL(urlproxy),
        },
    }
    rqt, err := http.NewRequest("GET", "http://myip.top", nil)
    if err != nil {
        println("接口获取IP失败!")
        return
    }

    //处理返回结果
    response, _ := client.Do(rqt)
    defer response.Body.Close()
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return
    }

    fmt.Println("socks5:", string(body))
    return

}

// http代理
func httpproxy() {
    urli := url.URL{}
    urlproxy, _ := urli.Parse("http://用户名:密码@测试ip:端口")
    client := &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyURL(urlproxy),
        },
    }
    rqt, err := http.NewRequest("GET", "http://myip.top", nil)
    if err != nil {
        println("接口获取IP失败!")
        return
    }

    //处理返回结果
    response, _ := client.Do(rqt)
    defer response.Body.Close()
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return
    }

    fmt.Println("http:", string(body))
    return

}

// 本机IP
func httplocal() {
    client := &http.Client{}
    rqt, err := http.NewRequest("GET", "http://myip.top", nil)
    if err != nil {
        println("接口获取IP失败!")
        return
    }

    //处理返回结果
    response, _ := client.Do(rqt)
    defer response.Body.Close()
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return
    }

    fmt.Println("本机:", string(body))
    return

}
func main() {
    httplocal()
    httpproxy()
    socksproxy()
}

PHP curl 模块示例

<?php
//Php http/sock5:

// 要访问的目标页面
        $targetUrl = "http://baidu.com";

        // 代理服务器
        $proxyServer = "username:password@proxy.example.com:8080";

        // 隧道身份信息
        $ch = curl_init();

        curl_setopt($ch, CURLOPT_URL, $targetUrl);

        curl_setopt($ch, CURLOPT_HTTPPROXYTUNNEL, false);

        curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);

        // 设置代理服务器
//        curl_setopt($ch, CURLOPT_PROXYTYPE, 0); //http

        curl_setopt($ch, CURLOPT_PROXYTYPE, 5); //sock5

        curl_setopt($ch, CURLOPT_PROXY, $proxyServer);

        // 设置隧道验证信息
        curl_setopt($ch, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);

        curl_setopt($ch, CURLOPT_USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727;)");

        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 3);

        curl_setopt($ch, CURLOPT_TIMEOUT, 5);

        curl_setopt($ch, CURLOPT_HEADER, true);

        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);

        $result = curl_exec($ch);

        curl_close($ch);

        var_dump($result);

Python 示例

Python Requests 使用示例

#coding=utf-8
import requests

#请求地址
targetUrl = "https://myip.top"

#代理服务器
proxyHost = "1.1.1.1"        #代理IP
proxyPort = "1234"      #代理端口
proxyUser = "1234"  #代理账户
proxyPass = "123456"  #代理密码

# Python2
#proxyMeta = "http://%(user)s:%(password)s@%(host)s:%(port)s" % {
#    "user" : proxyUser,
#    "passworrd" : proxyPass,
#    "host" : proxyHost,
#    "port" : proxyPort,
#}

# Python3
proxyMeta = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"

proxies = {
    "http"  : proxyMeta,
    "https"  : proxyMeta
}

resp = requests.get(targetUrl, proxies=proxies)
print("Use proxies:", proxies)
print("Respose status code: ", resp.status_code)
print("Respose content: ", resp.text)

Python aiohttp 模块示例

import asyncio
import json

import aiohttp
from aiohttp_socks import ProxyConnector


proxy_method = "http"
proxy_username = "test"
proxy_password = "test"
proxy_host = "1.1.1.1"
proxy_port = "8080"

async def verify(_) -> str:
    timeout = aiohttp.ClientTimeout(total=5)
    proxyConn = ProxyConnector.from_url(f"{proxy_method}://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}")
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.1 Safari/605.1.15",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Connection": "close"
    }
    async with aiohttp.ClientSession(timeout=timeout, connector=proxyConn, headers=headers) as session:
        try:
            async with session.get(url='https://myip.top', ssl=False) as response:
                result = await response.text()
                result = json.loads(result)
                ip = result["ip"]
                if ip:
                    print("ip", ip, result["city"], result["isp"])
                    return ip
                else:
                    return ""
            await session.close() # 使用随机出口隧道时,必须指定session关闭,否则会导致连接数过多,导致出口IP相同
        except Exception as e:
            print("error:", e)
            return ""


async def verify_s(sem, proxy):
    async with sem:
        return await verify(proxy)


async def run():
    task_num = 100
    sem = asyncio.Semaphore(5)
    task = [verify_s(sem, _) for _ in range(task_num)]
    result = await asyncio.gather(*task)
    ok = 0
    ip = []
    for res in result:
        if res:
            ok += 1
            ip.append(res)
    print(f'总测试数量: {task_num} 成功: {ok} 成功率: {int(ok / task_num * 100)}%')
    print(f"总请求数:{task_num} 去重IP数: {len(set(ip))}")


if __name__ == '__main__':
    asyncio.run(run())

Python seleniume 模块示例

#Python selenium http/socks5:

#coding=utf-8
from selenium import webdriver

# 代理服务器
proxyHost = "ip"
proxyPort = "port"
proxyUser = "username"
proxyPass = "password"
proxyType='http' #socks5

# 代理隧道验证信息
service_args = [
    "--proxy-type=%s" % proxyType,
    "--proxy=%(user)s:%(pass)s@%(host)s:%(port)s" % {
        "user" : proxyUser,
        "pass" : proxyPass,
        "host" : proxyHost,
        "port" : proxyPort,
    }
    ]

# 要访问的目标页面
targetUrl = "http://baidu.com"
driver = webdriver.PhantomJS(service_args=service_args)
driver.get(targetUrl)

print driver.title
print driver.page_source.encode("utf-8")
driver.quit()
 更新时间:2023-12-21 10:42