|
使用线程池来管理多个文件的上传 将多个文件分配给有限数量的线程
使用线程池来管理多个文件的上传。这个算法将多个文件分配给有限数量的线程,从而避免为每个文件创建一个新线程。我们将使用 C++ 的标准库中的线程和条件变量来实现这个线程池。
### 步骤概述
1. **创建线程池**:定义一个线程池,限制同时运行的线程数量。
2. **任务队列**:使用一个队列来存储待上传的文件任务。
3. **多线程处理**:线程从任务队列中获取文件进行上传。
4. **上传文件**:每个线程读取文件内容并上传到服务器。
- ### 示例代码
- #include <iostream>
- #include <fstream>
- #include <thread>
- #include <vector>
- #include <queue>
- #include <condition_variable>
- #include <mutex>
- #include <winsock2.h>
- #pragma comment(lib, "ws2_32.lib")
- // 上传文件的函数
- void uploadFile(const std::string& filePath, const std::string& serverIP, int port) {
- SOCKET sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock == INVALID_SOCKET) {
- std::cerr << "Socket creation failed!" << std::endl;
- return;
- }
- sockaddr_in serverAddr;
- serverAddr.sin_family = AF_INET;
- serverAddr.sin_port = htons(port);
- inet_pton(AF_INET, serverIP.c_str(), &serverAddr.sin_addr);
- if (connect(sock, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
- std::cerr << "Connection to server failed!" << std::endl;
- closesocket(sock);
- return;
- }
- std::ifstream file(filePath, std::ios::binary);
- if (!file) {
- std::cerr << "Failed to open file: " << filePath << std::endl;
- closesocket(sock);
- return;
- }
- std::vector<char> buffer((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
- send(sock, buffer.data(), buffer.size(), 0);
- closesocket(sock);
- std::cout << "File uploaded: " << filePath << std::endl;
- }
- // 线程池类
- class ThreadPool {
- public:
- ThreadPool(size_t numThreads);
- ~ThreadPool();
- void enqueue(const std::string& filePath, const std::string& serverIP, int port);
- private:
- std::vector<std::thread> workers;
- std::queue<std::pair<std::string, std::tuple<std::string, int>>> tasks;
- std::mutex queueMutex;
- std::condition_variable condition;
- bool stop;
- void worker();
- };
- // 线程池构造函数
- ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
- for (size_t i = 0; i < numThreads; ++i) {
- workers.emplace_back([this] { worker(); });
- }
- }
- // 线程池析构函数
- ThreadPool::~ThreadPool() {
- {
- std::unique_lock<std::mutex> lock(queueMutex);
- stop = true;
- }
- condition.notify_all();
- for (std::thread &worker : workers) {
- worker.join();
- }
- }
- // 向任务队列添加任务
- void ThreadPool::enqueue(const std::string& filePath, const std::string& serverIP, int port) {
- {
- std::unique_lock<std::mutex> lock(queueMutex);
- tasks.emplace(filePath, std::make_tuple(serverIP, port));
- }
- condition.notify_one();
- }
- // 工作线程处理函数
- void ThreadPool::worker() {
- while (true) {
- std::string filePath;
- std::string serverIP;
- int port;
- {
- std::unique_lock<std::mutex> lock(queueMutex);
- condition.wait(lock, [this] { return stop || !tasks.empty(); });
- if (stop && tasks.empty()) {
- return;
- }
- auto task = tasks.front();
- tasks.pop();
- filePath = task.first;
- std::tie(serverIP, port) = task.second;
- }
- uploadFile(filePath, serverIP, port);
- }
- }
- // 主函数
- int main() {
- const std::string serverIP = "127.0.0.1"; // 服务器IP
- const int port = 8080; // 服务器端口
- // 要上传的文件列表
- std::vector<std::string> filesToUpload = {
- "path/to/your/file1.txt",
- "path/to/your/file2.txt",
- "path/to/your/file3.txt",
- // 更多文件...
- };
- const size_t numThreads = 10; // 线程池中的线程数量
- ThreadPool pool(numThreads);
- // 将文件任务添加到线程池
- for (const auto& filePath : filesToUpload) {
- pool.enqueue(filePath, serverIP, port);
- }
- std::cout << "All files have been uploaded!" << std::endl;
- return 0;
- }
复制代码
### 代码说明
1. **uploadFile**:该函数负责打开指定文件,读取文件内容并上传到指定的服务器。
2. **ThreadPool 类**:
- **构造函数**:创建指定数量的工作线程。
- **enqueue**:向任务队列添加文件上传任务。
- **worker**:工作线程从任务队列中获取任务并执行上传。
3. **main**:在主函数中,定义要上传的文件列表,并将每个文件的上传任务添加到线程池中。
### 注意事项
- 确保在使用 WinSock 之前调用 `WSAStartup` ,并在程序结束时调用 `WSACleanup` 。
- 处理网络错误和异常情况,以确保程序的健壮性。
- 根据实际需求调整要上传的文件列表和服务器信息。
这个示例通过线程池的方式有效地管理了多个文件的上传任务,避免了为每个文件创建新线程的开销,提高了程序的性能和可扩展性。
|
|