Skip to content

Does the memory have residual memory? #648

@silence-coding

Description

@silence-coding

Does the memory have residual memory?

stream.pending_recv.push_back(&mut self.buffer, event);

Currently, the count_buf method has been used to find that self.buffer has accumulated data.

    pub fn poll_data(
        &mut self,
        cx: &Context,
        stream: &mut Stream,
    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
        // TODO: Return error when the stream is reset
        match stream.pending_recv.pop_front(&mut self.buffer) {
            Some(Event::Data(payload)) => {
                self.count_buf(); 
                Poll::Ready(Some(Ok(payload)))
            },
    // ignore
}
    fn count_buf(&mut self) {
        if self.time.elapsed().as_secs() < 2 {
            return;
        }
        self.time = Instant::now();
        let mut count = 0;
        let mut body_count = 0;
        for (k, s) in self.buffer.slab.iter() {
            match &s.value {
                Event::Data(payload) => {
                    count += 1;
                    body_count += 1;
                },
                _ => {
                    count += 1;
                }
            }
        }
        println!("------------pull {}, {}", count, body_count);
    }

When the client has only one request sent per second, the self.buffer count is still not released.
image

Scenarios:
client:

use std::time::{Duration, SystemTime};
use hyper::{Body, Client, Request};
use hyper::body::Bytes;
use hyper::client::HttpConnector;
use hyper::header::CONTENT_LENGTH;

#[tokio::main]
async fn main() {
    let client = hyper::client::Client::builder().http2_only(true).build_http();
    for i in 0..20 {
        let client = client.clone();
       tokio::spawn(async move {
           for i in 0..100 {
               send_req(client.clone()).await;
               tokio::time::sleep(Duration::from_millis(50)).await;
           }
           println!("xxx exit");
       });
        tokio::time::sleep(Duration::from_millis(20)).await;
    }
    for i in 0..100 {
        send_req(client.clone()).await;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    tokio::time::sleep(Duration::from_secs(100)).await;
}

async fn send_req(client: Client<HttpConnector>) {
    let (mut sender, body) = Body::channel();
    let num = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() % 1000;
    tokio::spawn(async move {
        for i in 0..num {
            sender.send_data("ss".into()).await;
        }
        sender.send_data(Bytes::new()).await;
    });
    let request = Request::builder().uri("http://127.0.0.1:8081")
        .header(CONTENT_LENGTH, (2 * num).to_string()).body(body).unwrap();
    let response = client.request(request).await.unwrap();
    if !response.status().is_success() {
        println!("{:?}", response);
    }
}

server:

pub async fn handle(mut req: Request<Body>) -> Result<Response<Body>, Error> {
    // println!("xxx {:?}", req);
    if req.version() == Version::HTTP_11 {
        hyper::body::to_bytes(req.into_body()).await;
        return Ok(Response::new(Body::from("sss")));
    }
    let client = hyper::client::Client::new();
    *req.version_mut() = Version::HTTP_11;
    let result = client.request(req).await;
    if result.is_err() {
        println!("{:?}", result);
    }
    let mut response = Response::new(Body::from("sss"));
    response
        .headers_mut()
        .insert("content-type", "application/json".parse().unwrap());
    hyper::Result::Ok(response)
}

For the crate of bytes, even if there is only one empty Bytes reference, the corresponding shared memory is not released.
image

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions